Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Add table and types for tracking sync status
cloudhead committed 2 years ago
commit 9f45405c648d246227d2653be4e22243b979094c
parent 3aad82d158590835d0737bfdd7125ca0b040a929
3 files changed +141 -23
modified radicle/src/node/address/schema.sql
@@ -60,3 +60,17 @@ create table if not exists "announcements" (
  --
  unique ("node", "repo", "type")
) strict;
+

+
-- Repository sync status.
+
create table if not exists "repo-sync-status" (
+
  -- Repository ID.
+
  "repo"                 text      not null,
+
  -- Node ID.
+
  "node"                 text      not null references "nodes" ("id"),
+
  -- Head of your `rad/sigrefs` branch that was synced.
+
  "head"                 text      not null,
+
  -- When this entry was last updated.
+
  "timestamp"            integer   not null,
+
  --
+
  unique ("repo", "node")
+
) strict;
modified radicle/src/node/address/store.rs
@@ -6,10 +6,11 @@ use localtime::LocalTime;
use sqlite as sql;
use thiserror::Error;

+
use crate::git::Oid;
use crate::node;
use crate::node::address::{KnownAddress, Source};
use crate::node::{Address, Alias, AliasError, AliasStore, NodeId};
-
use crate::prelude::Timestamp;
+
use crate::prelude::{Id, Timestamp};
use crate::sql::transaction;

use super::types;
@@ -93,26 +94,7 @@ impl Store for Book {
            let alias = Alias::from_str(row.read::<&str, _>("alias"))?;
            let timestamp = row.read::<i64, _>("timestamp") as Timestamp;
            let pow = row.read::<i64, _>("pow") as u32;
-
            let mut addrs = Vec::new();
-

-
            let mut stmt = self
-
                .db
-
                .prepare("SELECT type, value, source FROM addresses WHERE node = ?")?;
-
            stmt.bind((1, node))?;
-

-
            for row in stmt.into_iter() {
-
                let row = row?;
-
                let _typ = row.read::<AddressType, _>("type");
-
                let addr = row.read::<Address, _>("value");
-
                let source = row.read::<Source, _>("source");
-

-
                addrs.push(KnownAddress {
-
                    addr,
-
                    source,
-
                    last_success: None,
-
                    last_attempt: None,
-
                });
-
            }
+
            let addrs = self.addresses(node)?;

            Ok(Some(types::Node {
                features,
@@ -126,6 +108,35 @@ impl Store for Book {
        }
    }

+
    fn addresses(&self, node: &NodeId) -> Result<Vec<KnownAddress>, Error> {
+
        let mut addrs = Vec::new();
+
        let mut stmt = self.db.prepare(
+
            "SELECT type, value, source, last_attempt, last_success FROM addresses WHERE node = ?",
+
        )?;
+
        stmt.bind((1, node))?;
+

+
        for row in stmt.into_iter() {
+
            let row = row?;
+
            let _typ = row.read::<AddressType, _>("type");
+
            let addr = row.read::<Address, _>("value");
+
            let source = row.read::<Source, _>("source");
+
            let last_attempt = row
+
                .read::<Option<i64>, _>("last_attempt")
+
                .map(|t| LocalTime::from_millis(t as u128));
+
            let last_success = row
+
                .read::<Option<i64>, _>("last_success")
+
                .map(|t| LocalTime::from_millis(t as u128));
+

+
            addrs.push(KnownAddress {
+
                addr,
+
                source,
+
                last_success,
+
                last_attempt,
+
            });
+
        }
+
        Ok(addrs)
+
    }
+

    fn len(&self) -> Result<usize, Error> {
        let row = self
            .db
@@ -201,6 +212,62 @@ impl Store for Book {
        .map_err(Error::from)
    }

+
    fn synced(
+
        &mut self,
+
        rid: &Id,
+
        nid: &NodeId,
+
        at: Oid,
+
        timestamp: Timestamp,
+
    ) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO repo-sync-status (repo, node, head, timestamp)
+
             VALUES (?1, ?2, ?3, ?4)
+
             ON CONFLICT DO UPDATE
+
             SET head = ?3, timestamp = ?4
+
             WHERE timestamp < ?4",
+
        )?;
+
        stmt.bind((1, rid))?;
+
        stmt.bind((2, nid))?;
+
        stmt.bind((3, at.to_string().as_str()))?;
+
        stmt.bind((3, timestamp as i64))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    fn seeds(
+
        &self,
+
        rid: &Id,
+
    ) -> Result<Box<dyn Iterator<Item = Result<types::Seed, Error>> + '_>, Error> {
+
        let mut stmt = self.db.prepare(
+
            "SELECT node, head, timestamp
+
             FROM `repo-sync-status`
+
             WHERE repo = ?",
+
        )?;
+
        stmt.bind((1, rid))?;
+

+
        Ok(Box::new(stmt.into_iter().map(|row| {
+
            let row = row?;
+
            let nid = row.try_read::<NodeId, _>("node")?;
+
            let oid = row.try_read::<&str, _>("head")?;
+
            let oid = Oid::from_str(oid).map_err(|e| {
+
                Error::Internal(sql::Error {
+
                    code: None,
+
                    message: Some(format!("sql: invalid oid '{oid}': {e}")),
+
                })
+
            })?;
+
            let timestamp = row.try_read::<i64, _>("timestamp")?;
+
            let timestamp = LocalTime::from_millis(timestamp as u128);
+
            let addresses = self.addresses(&nid)?;
+

+
            Ok(types::Seed {
+
                nid,
+
                addresses,
+
                synced_at: types::SyncedAt { oid, timestamp },
+
            })
+
        })))
+
    }
+

    fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error> {
        let mut stmt = self
            .db
@@ -282,8 +349,10 @@ impl AliasStore for Book {
///
/// Used to store node addresses and metadata.
pub trait Store {
-
    /// Get a known peer address.
+
    /// Get the information we have about a node.
    fn get(&self, id: &NodeId) -> Result<Option<types::Node>, Error>;
+
    /// Get the addresses of a node.
+
    fn addresses(&self, node: &NodeId) -> Result<Vec<KnownAddress>, Error>;
    /// Insert a node with associated addresses into the store.
    ///
    /// Returns `true` if the node or addresses were updated, and `false` otherwise.
@@ -298,6 +367,19 @@ pub trait Store {
    ) -> Result<bool, Error>;
    /// Remove an address from the store.
    fn remove(&mut self, id: &NodeId) -> Result<bool, Error>;
+
    /// Mark a repo as synced on the given node.
+
    fn synced(
+
        &mut self,
+
        rid: &Id,
+
        nid: &NodeId,
+
        at: Oid,
+
        timestamp: Timestamp,
+
    ) -> Result<bool, Error>;
+
    /// Get nodes that have synced the given repo.
+
    fn seeds(
+
        &self,
+
        rid: &Id,
+
    ) -> Result<Box<dyn Iterator<Item = Result<types::Seed, Error>> + '_>, Error>;
    /// Returns the number of addresses.
    fn len(&self) -> Result<usize, Error>;
    /// Returns true if there are no addresses.
modified radicle/src/node/address/types.rs
@@ -7,7 +7,7 @@ use nonempty::NonEmpty;

use crate::collections::RandomMap;
use crate::node::{Address, Alias};
-
use crate::prelude::Timestamp;
+
use crate::prelude::{NodeId, Timestamp};
use crate::{node, profile};

/// A map with the ability to randomly select values.
@@ -176,3 +176,25 @@ impl std::fmt::Display for Source {
        }
    }
}
+

+
/// Holds an oid and timestamp.
+
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+
#[serde(rename_all = "camelCase")]
+
pub struct SyncedAt {
+
    /// Head of `rad/sigrefs`.
+
    pub oid: git_ext::Oid,
+
    /// When these refs were synced.
+
    pub timestamp: LocalTime,
+
}
+

+
/// Seed of a specific repository.
+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+
#[serde(rename_all = "camelCase")]
+
pub struct Seed {
+
    /// The Node ID.
+
    pub nid: NodeId,
+
    /// Known addresses for this node.
+
    pub addresses: Vec<KnownAddress>,
+
    /// Sync information for a given repo.
+
    pub synced_at: SyncedAt,
+
}