Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Don't re-use timestamps
Merged did:key:z6MksFqX...wzpT opened 2 years ago

There are rare cases where ref announcements may be sent in close succession, and the current clock time is re-used. This will cause the second (newer) announcement to be ignored by peers.

We add a timestamp method that checks if the timestamp is re-used, and increments it if so, ensuring that we always use a fresh one.

16 files changed +263 -134 7fc3b73e 2b771921
modified radicle-cli/tests/commands.rs
@@ -1221,7 +1221,7 @@ fn rad_clone_partial_fail() {
            node::Features::SEED,
            Alias::new("carol"),
            0,
-
            localtime::LocalTime::now().as_secs(),
+
            localtime::LocalTime::now().into(),
            [node::KnownAddress::new(
                // Eve will fail to connect to this address.
                node::Address::from(net::SocketAddr::from(([0, 0, 0, 0], 19873))),
@@ -1231,7 +1231,7 @@ fn rad_clone_partial_fail() {
        .unwrap();
    eve.db
        .routing_mut()
-
        .insert([&acme], carol, localtime::LocalTime::now().as_secs())
+
        .insert([&acme], carol, localtime::LocalTime::now().into())
        .unwrap();
    eve.config.peers = node::config::PeerConfig::Static;

@@ -1263,7 +1263,7 @@ fn rad_clone_connect() {
    let bob = environment.node(Config::test(Alias::new("bob")));
    let mut eve = environment.node(Config::test(Alias::new("eve")));
    let acme = RepoId::from_str("z42hL2jL4XNk6K8oHQaSWfMgCL7ji").unwrap();
-
    let now = localtime::LocalTime::now().as_secs();
+
    let now = localtime::LocalTime::now().into();

    fixtures::repository(working.join("acme"));

modified radicle-node/src/runtime.rs
@@ -176,9 +176,7 @@ impl Runtime {
                // If our announcement was made some time ago, the timestamp on it will be old,
                // and it might not get gossiped to new nodes since it will be purged from caches.
                // Therefore, we make sure it's never too old.
-
                if clock.as_millis() - ann.timestamp
-
                    <= config.limits.gossip_max_age.as_millis() as u64
-
                {
+
                if clock - ann.timestamp.to_local_time() <= config.limits.gossip_max_age {
                    Some(ann)
                } else {
                    None
@@ -202,7 +200,7 @@ impl Runtime {
            );
            ann
        } else {
-
            service::gossip::node(&config, clock.as_millis())
+
            service::gossip::node(&config, clock.into())
                .solve(Default::default())
                .expect("Runtime::init: unable to solve proof-of-work puzzle")
        };
@@ -218,7 +216,7 @@ impl Runtime {
                    radicle::node::Features::SEED,
                    alias,
                    0,
-
                    clock.as_millis(),
+
                    clock.into(),
                    [node::KnownAddress::new(addr, address::Source::Bootstrap)],
                )?;
            }
modified radicle-node/src/service.rs
@@ -393,8 +393,10 @@ pub struct Service<D, S, G> {
    last_sync: LocalTime,
    /// Last time the service routing table was pruned.
    last_prune: LocalTime,
-
    /// Last time the service announced its inventory.
+
    /// Last time the inventory was announced.
    last_announce: LocalTime,
+
    /// Last timestamp used for announcements.
+
    last_timestamp: Timestamp,
    /// Time when the service was initialized, or `None` if it wasn't initialized.
    started_at: Option<LocalTime>,
    /// Publishes events to subscribers.
@@ -455,6 +457,7 @@ where
            last_idle: LocalTime::default(),
            last_sync: LocalTime::default(),
            last_prune: LocalTime::default(),
+
            last_timestamp: Timestamp::MIN,
            last_announce: LocalTime::default(),
            started_at: None,
            emitter,
@@ -598,7 +601,7 @@ where
        // all of it. It can happen that inventory is not properly seeded if for eg. the
        // user creates a new repository while the node is stopped.
        let rids = self.storage.inventory()?;
-
        self.db.routing_mut().insert(&rids, nid, time.as_millis())?;
+
        self.db.routing_mut().insert(&rids, nid, time.into())?;

        let announced = self
            .db
@@ -629,7 +632,7 @@ where
                &rid,
                &nid,
                updated_at.oid,
-
                updated_at.timestamp.as_millis(),
+
                updated_at.timestamp.into(),
            )? {
                debug!(target: "service", "Saved local sync status for {rid}..");
            }
@@ -719,7 +722,7 @@ where
            if let Err(err) = self
                .db
                .gossip_mut()
-
                .prune((now - self.config.limits.gossip_max_age).as_millis())
+
                .prune((now - self.config.limits.gossip_max_age).into())
            {
                error!(target: "service", "Error pruning gossip entries: {err}");
            }
@@ -779,7 +782,7 @@ where

                // Let all our peers know that we're interested in this repo from now on.
                self.outbox.broadcast(
-
                    Message::subscribe(self.filter(), self.time(), Timestamp::MAX),
+
                    Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
                    self.sessions.connected().map(|(_, s)| s),
                );
            }
@@ -1044,7 +1047,7 @@ where
                info!(target: "service", "Fetched {rid} from {remote} successfully");
                // Update our routing table in case this fetch was user-initiated and doesn't
                // come from an announcement.
-
                self.seed_discovered(rid, remote, self.time());
+
                self.seed_discovered(rid, remote, self.clock.into());

                for update in &updated {
                    if update.is_skipped() {
@@ -1169,14 +1172,17 @@ where
        self.emitter.emit(Event::PeerConnected { nid: remote });

        let msgs = self.initial(link);
-
        let now = self.time();

        if link.is_outbound() {
            if let Some(peer) = self.sessions.get_mut(&remote) {
                peer.to_connected(self.clock);
                self.outbox.write_all(peer, msgs);

-
                if let Err(e) = self.db.addresses_mut().connected(&remote, &peer.addr, now) {
+
                if let Err(e) =
+
                    self.db
+
                        .addresses_mut()
+
                        .connected(&remote, &peer.addr, self.clock.into())
+
                {
                    error!(target: "service", "Error updating address book with connection: {e}");
                }
            }
@@ -1360,7 +1366,7 @@ where
        match self.db.gossip_mut().announced(announcer, announcement) {
            Ok(fresh) => {
                if !fresh {
-
                    debug!(target: "service", "Ignoring stale announcement from {announcer} (t={})", self.time());
+
                    debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
                    return Ok(false);
                }
            }
@@ -1752,9 +1758,10 @@ where
    }

    /// Set of initial messages to send to a peer.
-
    fn initial(&self, _link: Link) -> Vec<Message> {
-
        let filter = self.filter();
+
    fn initial(&mut self, _link: Link) -> Vec<Message> {
+
        let timestamp = self.timestamp();
        let now = self.clock();
+
        let filter = self.filter();
        let inventory = match self.storage.inventory() {
            Ok(i) => i,
            Err(e) => {
@@ -1775,8 +1782,8 @@ where
        // If this is our first connection to the network, we just ask for a fixed backlog
        // of messages to get us started.
        let since = match self.db.gossip().last() {
-
            Ok(Some(last)) => last - MAX_TIME_DELTA.as_millis() as Timestamp,
-
            Ok(None) => (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).as_millis() as Timestamp,
+
            Ok(Some(last)) => Timestamp::from(last.to_local_time() - MAX_TIME_DELTA),
+
            Ok(None) => (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into(),
            Err(e) => {
                error!(target: "service", "Error getting the lastest gossip message from storage: {e}");
                return vec![];
@@ -1787,7 +1794,7 @@ where

        vec![
            Message::node(self.node.clone(), &self.signer),
-
            Message::inventory(gossip::inventory(now.as_millis(), inventory), &self.signer),
+
            Message::inventory(gossip::inventory(timestamp, inventory), &self.signer),
            Message::subscribe(filter, since, Timestamp::MAX),
        ]
    }
@@ -1804,7 +1811,7 @@ where
    /// Update our routing table with our local node's inventory.
    fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
        let inventory = self.storage.inventory()?;
-
        let result = self.sync_routing(inventory, self.node_id(), self.time())?;
+
        let result = self.sync_routing(inventory, self.node_id(), self.clock.into())?;

        Ok(result)
    }
@@ -1860,12 +1867,12 @@ where

    /// Return a refs announcement including the given remotes.
    fn refs_announcement_for(
-
        &self,
+
        &mut self,
        rid: RepoId,
        remotes: impl IntoIterator<Item = NodeId>,
    ) -> Result<(Announcement, Vec<RefsAt>), Error> {
        let repo = self.storage.repository(rid)?;
-
        let timestamp = self.time();
+
        let timestamp = self.timestamp();
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();

        for remote_id in remotes.into_iter() {
@@ -1891,8 +1898,7 @@ where

    /// Announce our own refs for the given repo.
    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc<Verified>) -> Result<Vec<RefsAt>, Error> {
-
        let refs = self.announce_refs(rid, doc, [self.node_id()])?;
-
        let now = self.local_time();
+
        let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;

        // Update refs database with our signed refs branches.
        // This isn't strictly necessary for now, as we only use the database for fetches, and
@@ -1901,13 +1907,15 @@ where
            self.emitter.emit(Event::LocalRefsAnnounced {
                rid,
                refs: r,
-
                timestamp: now.as_millis(),
+
                timestamp,
            });
-
            if let Err(e) =
-
                self.database_mut()
-
                    .refs_mut()
-
                    .set(&rid, &r.remote, &SIGREFS_BRANCH, r.at, now)
-
            {
+
            if let Err(e) = self.database_mut().refs_mut().set(
+
                &rid,
+
                &r.remote,
+
                &SIGREFS_BRANCH,
+
                r.at,
+
                timestamp.to_local_time(),
+
            ) {
                error!(
                    target: "service",
                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
@@ -1924,9 +1932,10 @@ where
        rid: RepoId,
        doc: Doc<Verified>,
        remotes: impl IntoIterator<Item = NodeId>,
-
    ) -> Result<Vec<RefsAt>, Error> {
-
        let peers = self.sessions.connected().map(|(_, p)| p);
+
    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
+
        let timestamp = ann.timestamp();
+
        let peers = self.sessions.connected().map(|(_, p)| p);

        // Update our sync status for our own refs. This is useful for determining if refs were
        // updated while the node was stopped.
@@ -1934,14 +1943,14 @@ where
        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
            info!(
                target: "service",
-
                "Announcing own refs for {rid} to peers ({}) (t={})..",
-
                refs.at, ann.timestamp()
+
                "Announcing own refs for {rid} to peers ({}) (t={timestamp})..",
+
                refs.at
            );

            if let Err(e) = self
                .db
                .seeds_mut()
-
                .synced(&rid, &ann.node, refs.at, ann.timestamp())
+
                .synced(&rid, &ann.node, refs.at, timestamp)
            {
                error!(target: "service", "Error updating sync status for local node: {e}");
            }
@@ -1955,7 +1964,7 @@ where
            }),
            self.db.gossip_mut(),
        );
-
        Ok(refs)
+
        Ok((refs, timestamp))
    }

    fn sync_and_announce_inventory(&mut self) {
@@ -2004,9 +2013,9 @@ where
            return false;
        }
        let persistent = self.config.is_persistent(&nid);
-
        let time = self.time();
+
        let timestamp: Timestamp = self.clock.into();

-
        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, time) {
+
        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
            error!(target: "service", "Error updating address book with connection attempt: {e}");
        }
        self.sessions.insert(
@@ -2079,9 +2088,16 @@ where
        }
    }

-
    /// Get the current time.
-
    fn time(&self) -> Timestamp {
-
        self.clock.as_millis()
+
    /// Get a timestamp for using in announcements.
+
    /// Never returns the same timestamp twice.
+
    fn timestamp(&mut self) -> Timestamp {
+
        let now = Timestamp::from(self.clock);
+
        if *now > *self.last_timestamp {
+
            self.last_timestamp = now;
+
        } else {
+
            self.last_timestamp = self.last_timestamp + 1;
+
        }
+
        self.last_timestamp
    }

    ////////////////////////////////////////////////////////////////////////////
@@ -2090,23 +2106,7 @@ where

    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self, inventory: Inventory) -> Result<(), storage::Error> {
-
        let time = if self.clock > self.last_announce {
-
            self.clock.as_millis()
-
        } else if self.last_announce - self.clock < LocalDuration::from_secs(1) {
-
            // In rare cases where the inventory is updated very quickly, we want to make sure all
-
            // announcements carry an increasing timestamp. We allow our timestamps to be up to
-
            // one second in the future.
-
            self.last_announce.as_millis() + 1
-
        } else {
-
            // Announcement is considered redundant, ignore. Nb. This should not happen unless
-
            // you are trying to spam inventories.
-
            log::warn!(
-
                target: "service",
-
                "Ignored outgoing inventory announcement with {} items",
-
                inventory.len()
-
            );
-
            return Ok(());
-
        };
+
        let time = self.timestamp();
        let msg = AnnouncementMessage::from(gossip::inventory(time, inventory));

        self.outbox.announce(
@@ -2114,7 +2114,7 @@ where
            self.sessions.connected().map(|(_, p)| p),
            self.db.gossip_mut(),
        );
-
        self.last_announce = LocalTime::from_millis(time as u128);
+
        self.last_announce = time.to_local_time();

        Ok(())
    }
@@ -2127,7 +2127,7 @@ where

        let delta = count - self.config.limits.routing_max_size;
        self.db.routing_mut().prune(
-
            (*now - self.config.limits.routing_max_age).as_millis(),
+
            (*now - self.config.limits.routing_max_age).into(),
            Some(delta),
        )?;
        Ok(())
modified radicle-node/src/service/gossip/store.rs
@@ -1,3 +1,4 @@
+
use std::num::TryFromIntError;
use std::{fmt, io};

use radicle::crypto::Signature;
@@ -17,6 +18,9 @@ pub enum Error {
    /// An Internal error.
    #[error("internal error: {0}")]
    Internal(#[from] sql::Error),
+
    /// Unit overflow.
+
    #[error("unit overflow:: {0}")]
+
    UnitOverflow(#[from] TryFromIntError),
}

/// A database that has access to historical gossip messages.
@@ -54,7 +58,7 @@ impl Store for Database {
            .db
            .prepare("DELETE FROM `announcements` WHERE timestamp < ?1")?;

-
        stmt.bind((1, cutoff.try_into().unwrap_or(i64::MAX)))?;
+
        stmt.bind((1, &cutoff))?;
        stmt.next()?;

        Ok(self.db.change_count())
@@ -66,9 +70,10 @@ impl Store for Database {
            .prepare("SELECT MAX(timestamp) AS latest FROM `announcements`")?;

        if let Some(Ok(row)) = stmt.into_iter().next() {
-
            let latest = row.try_read::<Option<i64>, _>(0)?;
-

-
            return Ok(latest.map(|l| l as Timestamp));
+
            return match row.try_read::<Option<i64>, _>(0)? {
+
                Some(i) => Ok(Some(Timestamp::from(u64::try_from(i)?))),
+
                None => Ok(None),
+
            };
        }
        Ok(None)
    }
@@ -101,7 +106,7 @@ impl Store for Database {
            }
        }
        stmt.bind((5, &ann.signature))?;
-
        stmt.bind((6, ann.message.timestamp().try_into().unwrap_or(i64::MAX)))?;
+
        stmt.bind((6, &ann.message.timestamp()))?;
        stmt.next()?;

        Ok(self.db.change_count() > 0)
@@ -119,10 +124,10 @@ impl Store for Database {
             WHERE timestamp >= ?1 and timestamp < ?2
             ORDER BY timestamp, node, type",
        )?;
-
        assert!(from <= to);
+
        assert!(*from <= *to);

-
        stmt.bind((1, i64::try_from(from).unwrap_or(i64::MAX)))?;
-
        stmt.bind((2, i64::try_from(to).unwrap_or(i64::MAX)))?;
+
        stmt.bind((1, &from))?;
+
        stmt.bind((2, &to))?;

        Ok(Box::new(
            stmt.into_iter()
@@ -145,9 +150,9 @@ impl Store for Database {
                        }
                    };
                    let signature = row.read::<Signature, _>("signature");
-
                    let timestamp = row.read::<i64, _>("timestamp");
+
                    let timestamp = row.read::<Timestamp, _>("timestamp");

-
                    debug_assert_eq!(timestamp, message.timestamp() as i64);
+
                    debug_assert_eq!(timestamp, message.timestamp());

                    Ok(Announcement {
                        node,
modified radicle-node/src/service/message.rs
@@ -601,7 +601,7 @@ mod tests {
        let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
            rid: arbitrary::gen(1),
            refs,
-
            timestamp: LocalTime::now().as_millis(),
+
            timestamp: LocalTime::now().into(),
        })
        .signed(&MockSigner::default())
        .into();
@@ -621,7 +621,7 @@ mod tests {
                inventory: arbitrary::vec(INVENTORY_LIMIT)
                    .try_into()
                    .expect("size within bounds limit"),
-
                timestamp: LocalTime::now().as_millis(),
+
                timestamp: LocalTime::now().into(),
            },
            &MockSigner::default(),
        );
@@ -646,7 +646,7 @@ mod tests {
    #[quickcheck]
    fn prop_refs_announcement_signing(rid: RepoId) {
        let signer = MockSigner::new(&mut fastrand::Rng::new());
-
        let timestamp = 0;
+
        let timestamp = Timestamp::EPOCH;
        let at = raw::Oid::zero().into();
        let refs = BoundedVec::collect_from(
            &mut [RefsAt {
@@ -669,7 +669,7 @@ mod tests {
    fn test_node_announcement_validate() {
        let ann = NodeAnnouncement {
            features: node::Features::SEED,
-
            timestamp: 42491841,
+
            timestamp: Timestamp::from(42491841),
            alias: Alias::new("alice"),
            addresses: BoundedVec::new(),
            nonce: 0,
modified radicle-node/src/service/session.rs
@@ -6,7 +6,7 @@ use crate::node::Severity;
use crate::service::message;
use crate::service::message::Message;
use crate::service::{Address, LocalTime, NodeId, Outbox, RepoId, Rng};
-
use crate::Link;
+
use crate::{Link, Timestamp};

pub use crate::node::{PingState, State};

@@ -15,7 +15,7 @@ pub enum Error {
    /// The remote peer sent an invalid announcement timestamp,
    /// for eg. a timestamp far in the future.
    #[error("invalid announcement timestamp: {0}")]
-
    InvalidTimestamp(u64),
+
    InvalidTimestamp(Timestamp),
    /// The remote peer sent git protocol messages while we were expecting
    /// gossip messages. Or vice-versa.
    #[error("protocol mismatch")]
modified radicle-node/src/test/gossip.rs
@@ -29,7 +29,7 @@ pub fn messages(count: usize, now: LocalTime, delta: LocalDuration) -> Vec<Messa
        msgs.push(Message::node(
            NodeAnnouncement {
                features: node::Features::SEED,
-
                timestamp: time.as_millis(),
+
                timestamp: time.into(),
                alias: node::Alias::new(gen::string(5)),
                addresses: None.into(),
                nonce: 0,
@@ -41,7 +41,7 @@ pub fn messages(count: usize, now: LocalTime, delta: LocalDuration) -> Vec<Messa
        msgs.push(Message::inventory(
            InventoryAnnouncement {
                inventory: arbitrary::vec(3).try_into().unwrap(),
-
                timestamp: time.as_millis(),
+
                timestamp: time.into(),
            },
            &signer,
        ));
modified radicle-node/src/test/peer.rs
@@ -174,7 +174,7 @@ where
        for rid in storage.inventory().unwrap() {
            policies.seed(&rid, Scope::Followed).unwrap();
        }
-
        let announcement = service::gossip::node(&config.config, config.local_time.as_secs());
+
        let announcement = service::gossip::node(&config.config, config.local_time.into());
        let emitter: Emitter<Event> = Default::default();
        let service = Service::new(
            config.config,
@@ -249,7 +249,7 @@ where
    }

    pub fn timestamp(&self) -> Timestamp {
-
        self.clock().as_millis()
+
        (*self.clock()).into()
    }

    pub fn inventory(&self) -> Inventory {
modified radicle-node/src/tests.rs
@@ -60,7 +60,7 @@ use crate::{git, identity, rad, runtime, service, test};
#[test]
fn test_inventory_decode() {
    let inventory: Vec<RepoId> = arbitrary::gen(300);
-
    let timestamp = LocalTime::now().as_millis();
+
    let timestamp: Timestamp = LocalTime::now().into();

    let mut buf = Vec::new();
    inventory.as_slice().encode(&mut buf).unwrap();
@@ -260,7 +260,7 @@ fn test_inventory_sync() {
    let bob_signer = MockSigner::default();
    let bob_storage = fixtures::storage(tmp.path().join("bob"), &bob_signer).unwrap();
    let bob = Peer::config("bob", [8, 8, 8, 8], bob_storage, peer::Config::default());
-
    let now = LocalTime::now().as_millis();
+
    let now = LocalTime::now().into();
    let projs = bob.storage().inventory().unwrap();

    alice.connect_to(&bob);
@@ -374,7 +374,7 @@ fn test_inventory_pruning() {
                        inventory: test::arbitrary::vec::<RepoId>(num_projs)
                            .try_into()
                            .unwrap(),
-
                        timestamp: bob.local_time().as_millis(),
+
                        timestamp: bob.local_time().into(),
                    },
                    peer.signer(),
                ),
@@ -567,8 +567,8 @@ fn test_announcement_rebroadcast_timestamp_filtered() {
        eve.id(),
        Message::Subscribe(Subscribe {
            filter: Filter::default(),
-
            since: alice.local_time().as_millis(),
-
            until: (alice.local_time() + delta).as_millis(),
+
            since: alice.local_time().into(),
+
            until: (alice.local_time() + delta).into(),
        }),
    );

@@ -946,7 +946,7 @@ fn test_inventory_relay() {
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
    let inv = BoundedVec::try_from(arbitrary::vec(1)).unwrap();
-
    let now = LocalTime::now().as_millis();
+
    let now = LocalTime::now().into();

    // Inventory from Bob relayed to Eve.
    alice.init();
@@ -1274,7 +1274,7 @@ fn test_seed_repo_subscribe() {
            filter,
            since,
            ..
-
        })) if since == alice.clock().as_millis() && filter.contains(&rid)
+
        })) if since == alice.timestamp() && filter.contains(&rid)
    );
}

@@ -1292,7 +1292,7 @@ fn test_fetch_missing_inventory_on_gossip() {
        Message::inventory(
            InventoryAnnouncement {
                inventory: vec![rid].try_into().unwrap(),
-
                timestamp: now.as_millis(),
+
                timestamp: now.into(),
            },
            bob.signer(),
        ),
@@ -1317,7 +1317,7 @@ fn test_fetch_missing_inventory_on_schedule() {
        Message::inventory(
            InventoryAnnouncement {
                inventory: vec![rid].try_into().unwrap(),
-
                timestamp: now.as_millis(),
+
                timestamp: now.into(),
            },
            bob.signer(),
        ),
modified radicle-node/src/wire.rs
@@ -28,6 +28,7 @@ use crate::service::filter;
use crate::storage::refs::Refs;
use crate::storage::refs::RefsAt;
use crate::storage::refs::SignedRefs;
+
use crate::Timestamp;

/// The default type we use to represent sizes on the wire.
///
@@ -527,6 +528,20 @@ impl Decode for tor::OnionAddrV3 {
    }
}

+
impl Encode for Timestamp {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        self.deref().encode(writer)
+
    }
+
}
+

+
impl Decode for Timestamp {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let millis = u64::decode(reader)?;
+

+
        Ok(Timestamp::from(millis))
+
    }
+
}
+

#[cfg(test)]
mod tests {
    use super::*;
modified radicle/src/node.rs
@@ -11,6 +11,7 @@ pub mod policy;
pub mod refs;
pub mod routing;
pub mod seed;
+
pub mod timestamp;

use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
use std::io::{BufRead, BufReader};
@@ -41,6 +42,7 @@ pub use db::Database;
pub use events::{Event, Events};
pub use features::Features;
pub use seed::SyncedAt;
+
pub use timestamp::Timestamp;

/// Default name for control socket file.
pub const DEFAULT_SOCKET_NAME: &str = "control.sock";
@@ -65,9 +67,6 @@ pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire.debug";
#[cfg(not(debug_assertions))]
pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire";

-
/// Milliseconds since epoch.
-
pub type Timestamp = u64;
-

#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
pub enum PingState {
    #[default]
modified radicle/src/node/address/store.rs
@@ -89,7 +89,7 @@ impl Store for Database {
        if let Some(Ok(row)) = stmt.into_iter().next() {
            let features = row.read::<node::Features, _>("features");
            let alias = Alias::from_str(row.read::<&str, _>("alias"))?;
-
            let timestamp = row.read::<i64, _>("timestamp") as Timestamp;
+
            let timestamp = row.read::<Timestamp, _>("timestamp");
            let pow = row.read::<i64, _>("pow") as u32;
            let penalty = row.read::<i64, _>("penalty").min(u8::MAX as i64);
            let penalty = Penalty(penalty as u8);
@@ -185,7 +185,7 @@ impl Store for Database {
            stmt.bind((2, features))?;
            stmt.bind((3, sql::Value::String(alias.into())))?;
            stmt.bind((4, pow as i64))?;
-
            stmt.bind((5, timestamp as i64))?;
+
            stmt.bind((5, &timestamp))?;
            stmt.next()?;

            for addr in addrs {
@@ -200,7 +200,7 @@ impl Store for Database {
                stmt.bind((2, AddressType::from(&addr.addr)))?;
                stmt.bind((3, &addr.addr))?;
                stmt.bind((4, addr.source))?;
-
                stmt.bind((5, timestamp as i64))?;
+
                stmt.bind((5, &timestamp))?;
                stmt.next()?;
            }
            Ok::<_, Error>(db.change_count() > 0)
@@ -265,7 +265,7 @@ impl Store for Database {
             AND value = ?4",
        )?;

-
        stmt.bind((1, time as i64))?;
+
        stmt.bind((1, &time))?;
        stmt.bind((2, nid))?;
        stmt.bind((3, AddressType::from(addr)))?;
        stmt.bind((4, addr))?;
@@ -284,7 +284,7 @@ impl Store for Database {
                 AND value = ?4",
            )?;

-
            stmt.bind((1, time as i64))?;
+
            stmt.bind((1, &time))?;
            stmt.bind((2, nid))?;
            stmt.bind((3, AddressType::from(addr)))?;
            stmt.bind((4, addr))?;
@@ -435,7 +435,7 @@ mod test {
        let alice = arbitrary::gen::<NodeId>(1);
        let mut cache = Database::memory().unwrap();
        let features = node::Features::SEED;
-
        let timestamp = LocalTime::now().as_millis();
+
        let timestamp = Timestamp::from(LocalTime::now());

        cache
            .insert(&alice, features, Alias::new("alice"), 16, timestamp, [])
@@ -455,7 +455,7 @@ mod test {
        let alice = arbitrary::gen::<NodeId>(1);
        let mut cache = Database::memory().unwrap();
        let features = node::Features::SEED;
-
        let timestamp = LocalTime::now().as_millis();
+
        let timestamp = LocalTime::now().into();

        let ka = KnownAddress {
            addr: net::SocketAddr::from(([4, 4, 4, 4], 8776)).into(),
@@ -490,7 +490,7 @@ mod test {
        let alice = arbitrary::gen::<NodeId>(1);
        let mut cache = Database::memory().unwrap();
        let features = node::Features::SEED;
-
        let timestamp = LocalTime::now().as_millis();
+
        let timestamp = LocalTime::now().into();
        let alias = Alias::new("alice");

        let ka = KnownAddress {
@@ -517,7 +517,7 @@ mod test {
    fn test_insert_and_update() {
        let alice = arbitrary::gen::<NodeId>(1);
        let mut cache = Database::memory().unwrap();
-
        let timestamp = LocalTime::now().as_millis();
+
        let timestamp = LocalTime::now().into();
        let features = node::Features::SEED;
        let alias1 = Alias::new("alice");
        let alias2 = Alias::new("~alice~");
@@ -572,7 +572,7 @@ mod test {
        let alice = arbitrary::gen::<NodeId>(1);
        let bob = arbitrary::gen::<NodeId>(1);
        let mut cache = Database::memory().unwrap();
-
        let timestamp = LocalTime::now().as_millis();
+
        let timestamp = LocalTime::now().into();
        let features = node::Features::SEED;
        let alice_alias = Alias::new("alice");
        let bob_alias = Alias::new("bob");
@@ -620,7 +620,7 @@ mod test {
        let mut rng = fastrand::Rng::new();
        let mut cache = Database::memory().unwrap();
        let mut expected = Vec::new();
-
        let timestamp = LocalTime::now().as_millis();
+
        let timestamp = LocalTime::now().into();
        let features = node::Features::SEED;
        let alias = Alias::new("alice");

@@ -660,7 +660,7 @@ mod test {
        let addr = arbitrary::gen::<Address>(1);
        let mut cache = Database::memory().unwrap();
        let features = node::Features::SEED;
-
        let timestamp = LocalTime::now().as_millis();
+
        let timestamp = Timestamp::from(LocalTime::now());

        cache
            .insert(&alice, features, Alias::new("alice"), 16, timestamp, [])
modified radicle/src/node/routing.rs
@@ -97,7 +97,7 @@ impl Store for Database {
        stmt.bind((2, node))?;

        if let Some(Ok(row)) = stmt.into_iter().next() {
-
            return Ok(Some(row.read::<i64, _>("timestamp") as Timestamp));
+
            return Ok(Some(row.read::<Timestamp, _>("timestamp")));
        }
        Ok(None)
    }
@@ -108,7 +108,6 @@ impl Store for Database {
        node: NodeId,
        time: Timestamp,
    ) -> Result<Vec<(RepoId, InsertResult)>, Error> {
-
        let time: i64 = time.try_into().map_err(|_| Error::UnitOverflow)?;
        let mut results = Vec::new();

        transaction(&self.db, |db| {
@@ -130,7 +129,7 @@ impl Store for Database {

                stmt.bind((1, id))?;
                stmt.bind((2, &node))?;
-
                stmt.bind((3, time))?;
+
                stmt.bind((3, &time))?;
                stmt.next()?;

                let result = match (self.db.change_count() > 0, existed) {
@@ -184,7 +183,6 @@ impl Store for Database {
    }

    fn prune(&mut self, oldest: Timestamp, limit: Option<usize>) -> Result<usize, Error> {
-
        let oldest: i64 = oldest.try_into().map_err(|_| Error::UnitOverflow)?;
        let limit: i64 = limit
            .unwrap_or(i64::MAX as usize)
            .try_into()
@@ -194,7 +192,7 @@ impl Store for Database {
            "DELETE FROM routing WHERE rowid IN
            (SELECT rowid FROM routing WHERE timestamp < ? LIMIT ?)",
        )?;
-
        stmt.bind((1, oldest))?;
+
        stmt.bind((1, &oldest))?;
        stmt.bind((2, limit))?;
        stmt.next()?;

@@ -243,7 +241,7 @@ mod test {

        for node in &nodes {
            assert_eq!(
-
                db.insert(&ids, *node, 0).unwrap(),
+
                db.insert(&ids, *node, Timestamp::EPOCH).unwrap(),
                ids.iter()
                    .map(|id| (*id, InsertResult::SeedAdded))
                    .collect::<Vec<_>>()
@@ -265,7 +263,7 @@ mod test {
        let mut db = database(":memory:");

        for node in &nodes {
-
            db.insert(&ids, *node, 0).unwrap();
+
            db.insert(&ids, *node, Timestamp::EPOCH).unwrap();
        }

        for node in &nodes {
@@ -284,7 +282,7 @@ mod test {

        for node in &nodes {
            assert!(db
-
                .insert(&ids, *node, 0)
+
                .insert(&ids, *node, Timestamp::EPOCH)
                .unwrap()
                .iter()
                .all(|(_, r)| *r == InsertResult::SeedAdded));
@@ -306,7 +304,7 @@ mod test {
        let mut db = database(":memory:");

        for node in &nodes {
-
            db.insert(&ids, *node, 0).unwrap();
+
            db.insert(&ids, *node, Timestamp::EPOCH).unwrap();
        }
        for id in &ids {
            for node in &nodes {
@@ -325,15 +323,15 @@ mod test {
        let mut db = database(":memory:");

        assert_eq!(
-
            db.insert([&id], node, 0).unwrap(),
+
            db.insert([&id], node, Timestamp::EPOCH).unwrap(),
            vec![(id, InsertResult::SeedAdded)]
        );
        assert_eq!(
-
            db.insert([&id], node, 0).unwrap(),
+
            db.insert([&id], node, Timestamp::EPOCH).unwrap(),
            vec![(id, InsertResult::NotUpdated)]
        );
        assert_eq!(
-
            db.insert([&id], node, 0).unwrap(),
+
            db.insert([&id], node, Timestamp::EPOCH).unwrap(),
            vec![(id, InsertResult::NotUpdated)]
        );
    }
@@ -345,14 +343,14 @@ mod test {
        let mut db = database(":memory:");

        assert_eq!(
-
            db.insert([&id], node, 0).unwrap(),
+
            db.insert([&id], node, Timestamp::EPOCH).unwrap(),
            vec![(id, InsertResult::SeedAdded)]
        );
        assert_eq!(
-
            db.insert([&id], node, 1).unwrap(),
+
            db.insert([&id], node, Timestamp::from(1)).unwrap(),
            vec![(id, InsertResult::TimeUpdated)]
        );
-
        assert_eq!(db.entry(&id, &node).unwrap(), Some(1));
+
        assert_eq!(db.entry(&id, &node).unwrap(), Some(Timestamp::from(1)));
    }

    #[test]
@@ -363,18 +361,18 @@ mod test {
        let mut db = database(":memory:");

        assert_eq!(
-
            db.insert([&id1], node, 0).unwrap(),
+
            db.insert([&id1], node, Timestamp::EPOCH).unwrap(),
            vec![(id1, InsertResult::SeedAdded)]
        );
        assert_eq!(
-
            db.insert([&id1, &id2], node, 0).unwrap(),
+
            db.insert([&id1, &id2], node, Timestamp::EPOCH).unwrap(),
            vec![
                (id1, InsertResult::NotUpdated),
                (id2, InsertResult::SeedAdded)
            ]
        );
        assert_eq!(
-
            db.insert([&id1, &id2], node, 1).unwrap(),
+
            db.insert([&id1, &id2], node, Timestamp::from(1)).unwrap(),
            vec![
                (id1, InsertResult::TimeUpdated),
                (id2, InsertResult::TimeUpdated)
@@ -389,7 +387,7 @@ mod test {
        let mut db = database(":memory:");

        assert_eq!(
-
            db.insert([&id], node, 0).unwrap(),
+
            db.insert([&id], node, Timestamp::EPOCH).unwrap(),
            vec![(id, InsertResult::SeedAdded)]
        );
        assert!(db.remove(&id, &node).unwrap());
@@ -402,7 +400,7 @@ mod test {
        let ids = arbitrary::vec::<RepoId>(10);
        let node = arbitrary::gen(1);

-
        db.insert(&ids, node, LocalTime::now().as_millis()).unwrap();
+
        db.insert(&ids, node, LocalTime::now().into()).unwrap();

        assert_eq!(10, db.len().unwrap(), "correct number of rows in table");
    }
@@ -417,7 +415,7 @@ mod test {

        for node in &nodes {
            let time = rng.u64(..now.as_millis());
-
            db.insert(&ids, *node, time).unwrap();
+
            db.insert(&ids, *node, Timestamp::from(time)).unwrap();
        }

        let ids = arbitrary::vec::<RepoId>(10);
@@ -425,16 +423,16 @@ mod test {

        for node in &nodes {
            let time = rng.u64(now.as_millis()..i64::MAX as u64);
-
            db.insert(&ids, *node, time).unwrap();
+
            db.insert(&ids, *node, Timestamp::from(time)).unwrap();
        }

-
        let pruned = db.prune(now.as_millis(), None).unwrap();
+
        let pruned = db.prune(now.into(), None).unwrap();
        assert_eq!(pruned, ids.len() * nodes.len());

        for id in &ids {
            for node in &nodes {
                let t = db.entry(id, node).unwrap().unwrap();
-
                assert!(t >= now.as_millis());
+
                assert!(*t >= *Timestamp::from(now));
            }
        }
    }
@@ -446,7 +444,7 @@ mod test {
        let mut db = database(":memory:");

        for node in &nodes {
-
            db.insert([&id], *node, 0).unwrap();
+
            db.insert([&id], *node, Timestamp::EPOCH).unwrap();
        }
        assert_eq!(db.count(&id).unwrap(), nodes.len());
    }
modified radicle/src/node/seed/store.rs
@@ -64,7 +64,7 @@ impl Store for Database {
        stmt.bind((1, rid))?;
        stmt.bind((2, nid))?;
        stmt.bind((3, at.to_string().as_str()))?;
-
        stmt.bind((4, timestamp as i64))?;
+
        stmt.bind((4, &timestamp))?;
        stmt.next()?;

        Ok(self.db.change_count() > 0)
added radicle/src/node/timestamp.rs
@@ -0,0 +1,108 @@
+
use std::{
+
    fmt,
+
    ops::{Add, Deref, Sub},
+
};
+

+
use localtime::LocalTime;
+
use sqlite as sql;
+

+
/// Milliseconds since epoch.
+
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+
#[serde(transparent)]
+
pub struct Timestamp(u64);
+

+
impl Add<u64> for Timestamp {
+
    type Output = Timestamp;
+

+
    fn add(self, millis: u64) -> Self::Output {
+
        Self(self.0 + millis)
+
    }
+
}
+

+
impl Sub<u64> for Timestamp {
+
    type Output = Timestamp;
+

+
    fn sub(self, millis: u64) -> Self::Output {
+
        Self(self.0 - millis)
+
    }
+
}
+

+
impl Timestamp {
+
    /// UNIX epoch.
+
    pub const EPOCH: Self = Self(0);
+
    /// Minimum value.
+
    pub const MIN: Self = Self(0);
+
    /// Maximum value.
+
    // Nb. This is the maximum value that can fit in a signed 64-bit integer (`i64`).
+
    // This makes it possible to store timestamps in sqlite.
+
    pub const MAX: Self = Self(9223372036854775807);
+

+
    /// Convert to local time.
+
    pub fn to_local_time(&self) -> LocalTime {
+
        (*self).into()
+
    }
+
}
+

+
impl fmt::Display for Timestamp {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        self.0.fmt(f)
+
    }
+
}
+

+
impl Deref for Timestamp {
+
    type Target = u64;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl From<LocalTime> for Timestamp {
+
    fn from(t: LocalTime) -> Self {
+
        Self(t.as_millis())
+
    }
+
}
+

+
impl From<Timestamp> for LocalTime {
+
    fn from(t: Timestamp) -> Self {
+
        LocalTime::from_millis(t.0 as u128)
+
    }
+
}
+

+
impl From<u64> for Timestamp {
+
    fn from(u: u64) -> Self {
+
        Self(u)
+
    }
+
}
+

+
impl TryFrom<&sql::Value> for Timestamp {
+
    type Error = sql::Error;
+

+
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        match value {
+
            sql::Value::Integer(i) => match (*i).try_into() {
+
                Ok(u) => Ok(Timestamp(u)),
+
                Err(e) => Err(sql::Error {
+
                    code: None,
+
                    message: Some(format!("sql: invalid integer for timestamp: {e}")),
+
                }),
+
            },
+
            _ => Err(sql::Error {
+
                code: None,
+
                message: Some("sql: invalid type for timestamp".to_owned()),
+
            }),
+
        }
+
    }
+
}
+

+
impl sql::BindableWithIndex for &Timestamp {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        match i64::try_from(*self.deref()) {
+
            Ok(integer) => integer.bind(stmt, i),
+
            Err(e) => Err(sql::Error {
+
                code: None,
+
                message: Some(format!("sql: invalid timestamp: {e}")),
+
            }),
+
        }
+
    }
+
}
modified radicle/src/test/arbitrary.rs
@@ -19,7 +19,7 @@ use crate::identity::{
    Did,
};
use crate::node::address::AddressType;
-
use crate::node::{Address, Alias};
+
use crate::node::{Address, Alias, Timestamp};
use crate::storage;
use crate::storage::refs::{Refs, RefsAt, SignedRefs};
use crate::test::storage::{MockRepository, MockStorage};
@@ -312,3 +312,9 @@ impl Arbitrary for Alias {
        Alias::from_str(s).unwrap()
    }
}
+

+
impl Arbitrary for Timestamp {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        Self::from(u64::arbitrary(g))
+
    }
+
}