Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Rewrite `seeds` method to use sync-status
cloudhead committed 2 years ago
commit 269d21deca0f97d9468205074d4ee2ed17148818
parent 9f45405c648d246227d2653be4e22243b979094c
2 files changed +80 -27
modified radicle-node/src/service.rs
@@ -32,7 +32,7 @@ use crate::crypto::{Signer, Verified};
use crate::identity::{Doc, Id};
use crate::node::routing;
use crate::node::routing::InsertResult;
-
use crate::node::{Address, Alias, Features, FetchResult, HostName, Seed, Seeds};
+
use crate::node::{Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus};
use crate::prelude::*;
use crate::runtime::Emitter;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
@@ -122,6 +122,8 @@ pub enum Error {
    #[error(transparent)]
    Routing(#[from] routing::Error),
    #[error(transparent)]
+
    Addresses(#[from] address::Error),
+
    #[error(transparent)]
    Tracking(#[from] tracking::Error),
    #[error(transparent)]
    Repository(#[from] radicle::storage::RepositoryError),
@@ -544,7 +546,7 @@ where
                    resp.send(seeds).ok();
                }
                Err(e) => {
-
                    error!(target: "service", "Error reading routing table for {rid}: {e}");
+
                    error!(target: "service", "Error getting seeds for {rid}: {e}");
                }
            },
            Command::Fetch(rid, seed, timeout, resp) => {
@@ -1445,31 +1447,46 @@ where
    }

    fn seeds(&self, rid: &Id) -> Result<Seeds, Error> {
-
        match self.routing.get(rid) {
-
            Ok(seeds) => {
-
                Ok(seeds
-
                    .into_iter()
-
                    .fold(Seeds::new(self.rng.clone()), |mut seeds, node| {
-
                        if node != self.node_id() {
-
                            let addrs: Vec<KnownAddress> = self
-
                                .addresses
-
                                .get(&node)
-
                                .ok()
-
                                .flatten()
-
                                .map(|n| n.addrs)
-
                                .unwrap_or(vec![]);
-

-
                            if let Some(s) = self.sessions.get(&node) {
-
                                seeds.insert(Seed::new(node, addrs, Some(s.state.clone())));
-
                            } else {
-
                                seeds.insert(Seed::new(node, addrs, None));
-
                            }
+
        let mut seeds = Seeds::new(self.rng.clone());
+

+
        // First build a list from peers that have synced our own refs, if any.
+
        // This step is skipped if we don't have the repository yet, or don't have
+
        // our own refs.
+
        if let Ok(repo) = self.storage.repository(*rid) {
+
            if let Ok(local) = RefsAt::new(&repo, self.node_id()) {
+
                for seed in self.addresses.seeds(rid)? {
+
                    let seed = seed?;
+
                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
+
                    let synced = if local.at == seed.synced_at.oid {
+
                        SyncStatus::Synced { at: seed.synced_at }
+
                    } else {
+
                        SyncStatus::OutOfSync {
+
                            local: local.at,
+
                            remote: seed.synced_at.oid,
                        }
-
                        seeds
-
                    }))
+
                    };
+
                    seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
+
                }
            }
-
            Err(err) => Err(Error::Routing(err)),
        }
+

+
        // Then, add peers we know about but have no information about the sync status.
+
        // These peers have announced that they track the repository via an inventory
+
        // announcement, but we haven't received any ref announcements from them.
+
        for nid in self.routing.get(rid)? {
+
            if nid == self.node_id() {
+
                continue;
+
            }
+
            if seeds.contains(&nid) {
+
                // We already have a richer entry for this node.
+
                continue;
+
            }
+
            let addrs = self.addresses.addresses(&nid)?;
+
            let state = self.sessions.get(&nid).map(|s| s.state.clone());
+

+
            seeds.insert(Seed::new(nid, addrs, state, None));
+
        }
+
        Ok(seeds)
    }

    /// Return a new filter object, based on our tracking policy.
modified radicle/src/node.rs
@@ -26,7 +26,7 @@ use crate::identity::Id;
use crate::profile;
use crate::storage::RefUpdate;

-
pub use address::KnownAddress;
+
pub use address::{KnownAddress, SyncedAt};
pub use config::Config;
pub use cyphernet::addr::{HostName, PeerAddr};
pub use events::{Event, Events};
@@ -125,6 +125,26 @@ impl fmt::Display for State {
    }
}

+
/// Repository sync status for our own refs.
+
#[derive(Debug, PartialEq, Eq, Clone, serde::Serialize, serde::Deserialize)]
+
#[serde(tag = "status")]
+
pub enum SyncStatus {
+
    /// We're in sync.
+
    #[serde(rename_all = "camelCase")]
+
    Synced {
+
        /// At what ref was the remote synced at.
+
        at: SyncedAt,
+
    },
+
    /// We're out of sync.
+
    #[serde(rename_all = "camelCase")]
+
    OutOfSync {
+
        /// Local head of our `rad/sigrefs`.
+
        local: git_ext::Oid,
+
        /// Remote head of our `rad/sigrefs`.
+
        remote: git_ext::Oid,
+
    },
+
}
+

/// Node alias.
#[derive(Debug, PartialEq, Eq, Clone, serde::Serialize, serde::Deserialize)]
pub struct Alias(String);
@@ -427,6 +447,7 @@ pub struct Seed {
    pub nid: NodeId,
    pub addrs: Vec<KnownAddress>,
    pub state: Option<State>,
+
    pub sync: Option<SyncStatus>,
}

impl Seed {
@@ -435,8 +456,18 @@ impl Seed {
        matches!(self.state, Some(State::Connected { .. }))
    }

-
    pub fn new(nid: NodeId, addrs: Vec<KnownAddress>, state: Option<State>) -> Self {
-
        Self { nid, addrs, state }
+
    pub fn new(
+
        nid: NodeId,
+
        addrs: Vec<KnownAddress>,
+
        state: Option<State>,
+
        sync: Option<SyncStatus>,
+
    ) -> Self {
+
        Self {
+
            nid,
+
            addrs,
+
            state,
+
            sync,
+
        }
    }
}

@@ -457,6 +488,11 @@ impl Seeds {
        self.0.insert(seed.nid, seed);
    }

+
    /// Check membership.
+
    pub fn contains(&self, nid: &NodeId) -> bool {
+
        self.0.contains_key(nid)
+
    }
+

    /// Partitions the list of seeds into connected and disconnected seeds.
    /// Note that the disconnected seeds may be in a "connecting" state.
    pub fn partition(&self) -> (Vec<Seed>, Vec<Seed>) {