Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
cli: Implement `rad sync status` command
cloudhead committed 2 years ago
commit d3b0483f426f60a36f0c2bd0f8745953429e15d6
parent ca557909c2bebe2b72bbe9734fe38d49129b5e13
6 files changed +172 -18
modified radicle-cli/examples/rad-sync.md
@@ -7,6 +7,18 @@ For instance let's create an issue and sync it with the network:
$ rad issue open --title "Test `rad sync`" --description "Check that the command works" -q --no-announce
```

+
If we check the sync status, we see that our peers are out of sync:
+

+
```
+
$ rad sync status
+
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
+
│ ●   NID                                                Address                Status        At        Timestamp │
+
├─────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
+
│ ●   z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z   eve.radicle.xyz:8776   out-of-sync   f209c9f   [  ...  ] │
+
│ ●   z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk   bob.radicle.xyz:8776   out-of-sync   f209c9f   [  ...  ] │
+
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+
```
+

Now let's run `rad sync`. This will announce the issue refs to the network and
wait for nodes to announce that they have fetched those refs.

@@ -57,3 +69,15 @@ $ rad sync --fetch --replicas 1
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkux1…nVhib7Z..
✓ Fetched repository from 1 seed(s)
```
+

+
We can check the sync status again to make sure everything's in sync:
+

+
```
+
$ rad sync status
+
╭────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
+
│ ●   NID                                                Address                Status   At        Timestamp │
+
├────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
+
│ ●   z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z   eve.radicle.xyz:8776   synced   9f615f9   [  ...  ] │
+
│ ●   z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk   bob.radicle.xyz:8776   synced   9f615f9   [  ...  ] │
+
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+
```
modified radicle-cli/src/commands/sync.rs
@@ -4,12 +4,14 @@ use std::time;
use anyhow::{anyhow, Context as _};

use radicle::node;
-
use radicle::node::{FetchResult, FetchResults, Handle as _, Node};
+
use radicle::node::{FetchResult, FetchResults, Handle as _, Node, SyncStatus};
use radicle::prelude::{Id, NodeId, Profile};
use radicle::storage::{ReadRepository, ReadStorage};
+
use radicle_term::Element;

use crate::terminal as term;
use crate::terminal::args::{Args, Error, Help};
+
use crate::terminal::{Table, TableOptions};

pub const HELP: Help = Help {
    name: "sync",
@@ -20,6 +22,7 @@ Usage

    rad sync [--fetch | --announce] [<rid>] [<option>...]
    rad sync --inventory [<option>...]
+
    rad sync status [<rid>] [<option>...]

    By default, the current repository is synchronized both ways.
    If an <rid> is specified, that repository is synced instead.
@@ -40,6 +43,10 @@ Usage
    If `--inventory` is specified, the node's inventory is announced to
    the network. This mode does not take an `<rid>`.

+
Commands
+

+
    status                    Display the sync status of a repository
+

Options

    --fetch, -f               Turn on fetching (default: true)
@@ -53,6 +60,13 @@ Options
"#,
};

+
#[derive(Debug, Clone, PartialEq, Eq, Default)]
+
pub enum Operation {
+
    Synchronize(SyncMode),
+
    #[default]
+
    Status,
+
}
+

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncMode {
    Repo {
@@ -99,7 +113,7 @@ pub struct Options {
    pub rid: Option<Id>,
    pub verbose: bool,
    pub timeout: time::Duration,
-
    pub sync: SyncMode,
+
    pub op: Operation,
}

impl Args for Options {
@@ -115,6 +129,7 @@ impl Args for Options {
        let mut inventory = false;
        let mut replicas = None;
        let mut seeds = Vec::new();
+
        let mut op: Option<Operation> = None;

        while let Some(arg) = parser.next()? {
            match arg {
@@ -154,9 +169,14 @@ impl Args for Options {
                Long("help") | Short('h') => {
                    return Err(Error::Help.into());
                }
-
                Value(val) if rid.is_none() => {
-
                    rid = Some(term::args::rid(&val)?);
-
                }
+
                Value(val) if rid.is_none() => match val.to_string_lossy().as_ref() {
+
                    "s" | "status" => {
+
                        op = Some(Operation::Status);
+
                    }
+
                    _ => {
+
                        rid = Some(term::args::rid(&val)?);
+
                    }
+
                },
                arg => {
                    return Err(anyhow!(arg.unexpected()));
                }
@@ -195,7 +215,7 @@ impl Args for Options {
                rid,
                verbose,
                timeout,
-
                sync,
+
                op: op.unwrap_or(Operation::Synchronize(sync)),
            },
            vec![],
        ))
@@ -220,8 +240,11 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
        );
    }

-
    match options.sync {
-
        SyncMode::Repo { mode, direction } => {
+
    match options.op {
+
        Operation::Status => {
+
            sync_status(rid, &mut node)?;
+
        }
+
        Operation::Synchronize(SyncMode::Repo { mode, direction }) => {
            if [SyncDirection::Fetch, SyncDirection::Both].contains(&direction) {
                if !profile.tracking()?.is_repo_tracked(&rid)? {
                    anyhow::bail!("repository {rid} is not tracked");
@@ -240,13 +263,69 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
                announce_refs(rid, mode, options.timeout, node, &profile)?;
            }
        }
-
        SyncMode::Inventory => {
+
        Operation::Synchronize(SyncMode::Inventory) => {
            announce_inventory(node)?;
        }
    }
    Ok(())
}

+
fn sync_status(rid: Id, node: &mut Node) -> anyhow::Result<()> {
+
    let mut table = Table::<6, term::Label>::new(TableOptions::bordered());
+
    let seeds: Vec<_> = node.seeds(rid)?.into();
+

+
    table.push([
+
        term::format::dim(String::from("●")).into(),
+
        term::format::bold(String::from("NID")).into(),
+
        term::format::bold(String::from("Address")).into(),
+
        term::format::bold(String::from("Status")).into(),
+
        term::format::bold(String::from("At")).into(),
+
        term::format::bold(String::from("Timestamp")).into(),
+
    ]);
+
    table.divider();
+

+
    for seed in seeds {
+
        let (icon, status, refs, time) = match seed.sync {
+
            Some(SyncStatus::Synced { at }) => (
+
                term::format::positive("●"),
+
                term::format::positive("synced"),
+
                term::label(term::format::oid(at.oid)),
+
                term::format::timestamp(at.timestamp).into(),
+
            ),
+
            Some(SyncStatus::OutOfSync { remote, .. }) => (
+
                term::format::negative("●"),
+
                term::format::negative("out-of-sync"),
+
                term::label(term::format::oid(remote.oid)),
+
                term::format::timestamp(remote.timestamp).into(),
+
            ),
+
            None => (
+
                term::format::yellow("●"),
+
                term::format::yellow("?"),
+
                term::label("?"),
+
                term::label("?"),
+
            ),
+
        };
+
        let addr = seed
+
            .addrs
+
            .first()
+
            .map(|a| a.addr.to_string())
+
            .unwrap_or_default()
+
            .into();
+

+
        table.push([
+
            icon.into(),
+
            seed.nid.to_human().into(),
+
            addr,
+
            status.into(),
+
            refs,
+
            time,
+
        ]);
+
    }
+
    table.print();
+

+
    Ok(())
+
}
+

fn announce_refs(
    rid: Id,
    _mode: RepoSync,
modified radicle-cli/tests/commands.rs
@@ -45,6 +45,16 @@ fn test<'a>(
    Ok(())
}

+
/// Test config.
+
fn config(alias: &'static str) -> Config {
+
    Config {
+
        external_addresses: vec![
+
            node::Address::from_str(&format!("{alias}.radicle.xyz:8776")).unwrap(),
+
        ],
+
        ..Config::test(Alias::new(alias))
+
    }
+
}
+

fn formula(root: &Path, test: impl AsRef<Path>) -> Result<TestFormula, Box<dyn std::error::Error>> {
    let mut formula = TestFormula::new(root.to_path_buf());
    let base = Path::new(env!("CARGO_MANIFEST_DIR"));
@@ -1018,9 +1028,9 @@ fn test_cob_deletion() {
fn rad_sync() {
    let mut environment = Environment::new();
    let working = environment.tmp().join("working");
-
    let alice = environment.node(Config::test(Alias::new("alice")));
-
    let bob = environment.node(Config::test(Alias::new("bob")));
-
    let eve = environment.node(Config::test(Alias::new("eve")));
+
    let alice = environment.node(config("alice"));
+
    let bob = environment.node(config("bob"));
+
    let eve = environment.node(config("eve"));
    let acme = Id::from_str("z42hL2jL4XNk6K8oHQaSWfMgCL7ji").unwrap();

    fixtures::repository(working.join("acme"));
modified radicle-node/src/service.rs
@@ -32,7 +32,9 @@ use crate::crypto::{Signer, Verified};
use crate::identity::{Doc, Id};
use crate::node::routing;
use crate::node::routing::InsertResult;
-
use crate::node::{Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus};
+
use crate::node::{
+
    Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt,
+
};
use crate::prelude::*;
use crate::runtime::Emitter;
use crate::service::message::{Announcement, AnnouncementMessage, Info, Ping};
@@ -116,6 +118,8 @@ pub enum Error {
    #[error(transparent)]
    Git(#[from] radicle::git::raw::Error),
    #[error(transparent)]
+
    GitExt(#[from] radicle::git::ext::Error),
+
    #[error(transparent)]
    Storage(#[from] storage::Error),
    #[error(transparent)]
    Refs(#[from] storage::refs::Error),
@@ -1137,6 +1141,27 @@ where
                    }
                }

+
                // Update sync status for this repo.
+
                if let Some(refs) = message.refs.iter().find(|r| &r.remote == self.nid()) {
+
                    match self
+
                        .addresses
+
                        .synced(&message.rid, announcer, refs.at, message.timestamp)
+
                    {
+
                        Ok(updated) => {
+
                            if updated {
+
                                debug!(
+
                                    target: "service",
+
                                    "Updating sync status of {announcer} for {} to {}",
+
                                    message.rid, refs.at
+
                                );
+
                            }
+
                        }
+
                        Err(e) => {
+
                            error!(target: "service", "Error updating sync status for {}: {e}", message.rid);
+
                        }
+
                    }
+
                }
+

                // TODO: Buffer/throttle fetches.
                let repo_entry = self.tracking.repo_policy(&message.rid).expect(
                    "Service::handle_announcement: error accessing repo tracking configuration",
@@ -1531,7 +1556,9 @@ where
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();

        for remote_id in remotes.into_iter() {
-
            if refs.push(RefsAt::new(&repo, remote_id)?).is_err() {
+
            let refs_at = RefsAt::new(&repo, remote_id)?;
+

+
            if refs.push(refs_at).is_err() {
                warn!(
                    target: "service",
                    "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
@@ -1634,9 +1661,11 @@ where
                    let synced = if local.at == seed.synced_at.oid {
                        SyncStatus::Synced { at: seed.synced_at }
                    } else {
+
                        let local = SyncedAt::new(local.at, &repo)?;
+

                        SyncStatus::OutOfSync {
-
                            local: local.at,
-
                            remote: seed.synced_at.oid,
+
                            local,
+
                            remote: seed.synced_at,
                        }
                    };
                    seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
modified radicle/src/node.rs
@@ -141,9 +141,9 @@ pub enum SyncStatus {
    #[serde(rename_all = "camelCase")]
    OutOfSync {
        /// Local head of our `rad/sigrefs`.
-
        local: git_ext::Oid,
+
        local: SyncedAt,
        /// Remote head of our `rad/sigrefs`.
-
        remote: git_ext::Oid,
+
        remote: SyncedAt,
    },
}

modified radicle/src/node/address/types.rs
@@ -6,8 +6,10 @@ use localtime::LocalTime;
use nonempty::NonEmpty;

use crate::collections::RandomMap;
+
use crate::git;
use crate::node::{Address, Alias};
use crate::prelude::{NodeId, Timestamp};
+
use crate::storage::ReadRepository;
use crate::{node, profile};

/// A map with the ability to randomly select values.
@@ -193,6 +195,16 @@ pub struct SyncedAt {
    pub timestamp: LocalTime,
}

+
impl SyncedAt {
+
    /// Create a new [`SyncedAt`] given an OID, by looking up the timestamp in the repo.
+
    pub fn new<S: ReadRepository>(oid: git::ext::Oid, repo: &S) -> Result<Self, git::ext::Error> {
+
        let timestamp = repo.commit(oid)?.time();
+
        let timestamp = LocalTime::from_secs(timestamp.seconds() as u64);
+

+
        Ok(Self { oid, timestamp })
+
    }
+
}
+

/// Seed of a specific repository.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]