Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
cli: Rework `rad sync`
Merged did:key:z6MksFqX...wzpT opened 2 years ago

The command now works with both a replica target and a seeds target. This is especially useful to eg. ensure that a preferred seed has been synced, while still trying to hit a higher replica count.

8 files changed +205 -150 c13c658f b994c4a6
modified radicle-cli/examples/rad-clone-connect.md
@@ -4,10 +4,10 @@ automatically connect to the necessary seeds.
```
$ rad clone rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji
✓ Seeding policy updated for rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji with scope 'all'
-
✓ Connecting to z6MknSL…StBU8Vi@[..]
-
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6MknSL…StBU8Vi..
✓ Connecting to z6Mkt67…v4N1tRk@[..]
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkt67…v4N1tRk..
+
✓ Connecting to z6MknSL…StBU8Vi@[..]
+
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6MknSL…StBU8Vi..
✓ Creating checkout in ./heartwood..
✓ Remote alice@z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi added
✓ Remote-tracking branch alice@z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi/master created for z6MknSL…StBU8Vi
modified radicle-cli/examples/rad-sync.md
@@ -47,7 +47,7 @@ be up to date.

```
$ rad sync --announce
-
✓ Nothing to announce, already in sync with network (see `rad sync status`)
+
✓ Nothing to announce, already in sync with 2 node(s) (see `rad sync status`)
```

We can also use the `--fetch` option to only fetch objects:
@@ -66,7 +66,7 @@ $ rad sync --fetch --announce
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkt67…v4N1tRk..
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkux1…nVhib7Z..
✓ Fetched repository from 2 seed(s)
-
✓ Nothing to announce, already in sync with network (see `rad sync status`)
+
✓ Nothing to announce, already in sync with 2 node(s) (see `rad sync status`)
```

It's also possible to use the `--seed` flag to only sync with a specific node:
modified radicle-cli/src/commands/clone.rs
@@ -60,8 +60,8 @@ pub struct Options {
    directory: Option<PathBuf>,
    /// The seeding scope of the repository.
    scope: Scope,
-
    /// Sync mode.
-
    mode: sync::RepoSync,
+
    /// Sync settings.
+
    sync: sync::RepoSync,
    /// Fetch timeout.
    timeout: time::Duration,
}
@@ -73,7 +73,7 @@ impl Args for Options {
        let mut parser = lexopt::Parser::from_args(args);
        let mut id: Option<RepoId> = None;
        let mut scope = Scope::All;
-
        let mut mode = sync::RepoSync::default();
+
        let mut sync = sync::RepoSync::default();
        let mut timeout = time::Duration::from_secs(9);
        let mut directory = None;

@@ -83,11 +83,8 @@ impl Args for Options {
                    let value = parser.value()?;
                    let value = term::args::nid(&value)?;

-
                    if let sync::RepoSync::Seeds(seeds) = &mut mode {
-
                        seeds.push(value);
-
                    } else {
-
                        mode = sync::RepoSync::Seeds(vec![value]);
-
                    }
+
                    sync.seeds.insert(value);
+
                    sync.replicas = sync.seeds.len();
                }
                Long("scope") => {
                    let value = parser.value()?;
@@ -129,7 +126,7 @@ impl Args for Options {
                id,
                directory,
                scope,
-
                mode,
+
                sync,
                timeout,
            },
            vec![],
@@ -152,7 +149,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
        options.id,
        options.directory.clone(),
        options.scope,
-
        options.mode,
+
        options.sync.with_profile(&profile),
        options.timeout,
        &mut node,
        &signer,
@@ -238,7 +235,7 @@ pub fn clone<G: Signer>(
    id: RepoId,
    directory: Option<PathBuf>,
    scope: Scope,
-
    mode: sync::RepoSync,
+
    settings: sync::RepoSync,
    timeout: time::Duration,
    node: &mut Node,
    signer: &G,
@@ -262,7 +259,7 @@ pub fn clone<G: Signer>(
        );
    }

-
    let results = sync::fetch(id, mode, timeout, node)?;
+
    let results = sync::fetch(id, settings, timeout, node)?;
    let Ok(repository) = storage.repository(id) else {
        // If we don't have the repository locally, even after attempting to fetch,
        // there's nothing we can do.
modified radicle-cli/src/commands/remote/add.rs
@@ -31,7 +31,7 @@ pub fn run(
            follow::follow(*nid, alias, &mut node, profile)?;
            sync::fetch(
                rid,
-
                sync::RepoSync::default(),
+
                sync::RepoSync::default().with_profile(profile),
                time::Duration::from_secs(9),
                &mut node,
            )?;
modified radicle-cli/src/commands/seed.rs
@@ -137,7 +137,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
            if fetch && node.is_running() {
                sync::fetch(
                    rid,
-
                    sync::RepoSync::default(),
+
                    sync::RepoSync::default().with_profile(&profile),
                    time::Duration::from_secs(6),
                    &mut node,
                )?;
modified radicle-cli/src/commands/sync.rs
@@ -1,4 +1,5 @@
use std::cmp::Ordering;
+
use std::collections::BTreeSet;
use std::ffi::OsString;
use std::ops::ControlFlow;
use std::str::FromStr;
@@ -98,7 +99,7 @@ impl FromStr for SortBy {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncMode {
    Repo {
-
        mode: RepoSync,
+
        settings: RepoSync,
        direction: SyncDirection,
    },
    Inventory,
@@ -107,24 +108,55 @@ pub enum SyncMode {
impl Default for SyncMode {
    fn default() -> Self {
        Self::Repo {
-
            mode: RepoSync::default(),
+
            settings: RepoSync::default(),
            direction: SyncDirection::default(),
        }
    }
}

-
/// Repository sync mode.
+
/// Repository sync settings.
#[derive(Debug, Clone, PartialEq, Eq)]
-
pub enum RepoSync {
-
    /// Sync with N replicas.
-
    Replicas(usize),
+
pub struct RepoSync {
+
    /// Sync with at least N replicas.
+
    pub replicas: usize,
    /// Sync with the given list of seeds.
-
    Seeds(Vec<NodeId>),
+
    pub seeds: BTreeSet<NodeId>,
+
}
+

+
impl RepoSync {
+
    pub fn from_seeds(seeds: impl IntoIterator<Item = NodeId>) -> Self {
+
        let seeds = BTreeSet::from_iter(seeds);
+
        Self {
+
            replicas: seeds.len(),
+
            seeds,
+
        }
+
    }
+

+
    /// Use profile to populate sync settings, by adding preferred seeds if no seeds are specified,
+
    /// and removing the local node from the set.
+
    pub fn with_profile(mut self, profile: &Profile) -> Self {
+
        // If no seeds were specified, add up to `replica` seeds from the preferred seeds.
+
        if self.seeds.is_empty() {
+
            self.seeds = profile
+
                .config
+
                .preferred_seeds
+
                .iter()
+
                .map(|p| p.id)
+
                .take(self.replicas)
+
                .collect();
+
        }
+
        // Remove our local node from the seed set just in case it was added by mistake.
+
        self.seeds.remove(profile.id());
+
        self
+
    }
}

impl Default for RepoSync {
    fn default() -> Self {
-
        Self::Replicas(3)
+
        Self {
+
            replicas: 3,
+
            seeds: BTreeSet::new(),
+
        }
    }
}

@@ -157,7 +189,7 @@ impl Args for Options {
        let mut announce = false;
        let mut inventory = false;
        let mut replicas = None;
-
        let mut seeds = Vec::new();
+
        let mut seeds = BTreeSet::new();
        let mut sort_by = SortBy::default();
        let mut op: Option<Operation> = None;

@@ -182,7 +214,7 @@ impl Args for Options {
                    let val = parser.value()?;
                    let nid = term::args::nid(&val)?;

-
                    seeds.push(nid);
+
                    seeds.insert(nid);
                }
                Long("announce") | Short('a') => {
                    announce = true;
@@ -227,21 +259,21 @@ impl Args for Options {
                (true, false) => SyncDirection::Fetch,
                (false, true) => SyncDirection::Announce,
            };
-
            let mode = match (seeds, replicas) {
-
                (seeds, Some(replicas)) => {
-
                    if seeds.is_empty() {
-
                        RepoSync::Replicas(replicas)
-
                    } else {
-
                        anyhow::bail!("`--replicas` cannot be specified with `--seed`");
-
                    }
+
            let settings = if seeds.is_empty() {
+
                RepoSync {
+
                    replicas: replicas.unwrap_or(3),
+
                    seeds,
+
                }
+
            } else {
+
                RepoSync {
+
                    replicas: replicas.unwrap_or(seeds.len()),
+
                    seeds,
                }
-
                (seeds, None) if !seeds.is_empty() => RepoSync::Seeds(seeds),
-
                (_, None) => RepoSync::default(),
            };
-
            if direction == SyncDirection::Announce && matches!(mode, RepoSync::Seeds(_)) {
-
                anyhow::bail!("`--seed` is only supported when fetching");
+
            SyncMode::Repo {
+
                settings,
+
                direction,
            }
-
            SyncMode::Repo { mode, direction }
        };

        Ok((
@@ -279,12 +311,17 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
        Operation::Status => {
            sync_status(rid, &mut node, &profile, &options)?;
        }
-
        Operation::Synchronize(SyncMode::Repo { mode, direction }) => {
+
        Operation::Synchronize(SyncMode::Repo {
+
            settings,
+
            direction,
+
        }) => {
+
            let settings = settings.with_profile(&profile);
+

            if [SyncDirection::Fetch, SyncDirection::Both].contains(&direction) {
                if !profile.policies()?.is_seeding(&rid)? {
                    anyhow::bail!("repository {rid} is not seeded");
                }
-
                let results = fetch(rid, mode.clone(), options.timeout, &mut node)?;
+
                let results = fetch(rid, settings.clone(), options.timeout, &mut node)?;
                let success = results.success().count();
                let failed = results.failed().count();

@@ -297,7 +334,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
                }
            }
            if [SyncDirection::Announce, SyncDirection::Both].contains(&direction) {
-
                announce_refs(rid, mode, options.timeout, node, &profile)?;
+
                announce_refs(rid, settings, options.timeout, &mut node, &profile)?;
            }
        }
        Operation::Synchronize(SyncMode::Inventory) => {
@@ -378,9 +415,9 @@ fn sync_status(

fn announce_refs(
    rid: RepoId,
-
    mode: RepoSync,
+
    settings: RepoSync,
    timeout: time::Duration,
-
    mut node: Node,
+
    node: &mut Node,
    profile: &Profile,
) -> anyhow::Result<()> {
    let Ok(repo) = profile.storage.repository(rid) else {
@@ -390,43 +427,32 @@ fn announce_refs(
    };
    let doc = repo.identity_doc()?;
    let unsynced: Vec<_> = if doc.visibility.is_public() {
-
        let seeds = node.seeds(rid)?;
-
        let synced = seeds.iter().filter(|s| s.is_synced());
-

-
        match mode {
-
            RepoSync::Seeds(ref seeds) => {
-
                let synced = synced.map(|s| s.nid).collect::<Vec<_>>();
-
                if seeds.iter().all(|s| synced.contains(s)) {
-
                    term::success!(
-
                        "Already in sync with the specified seed(s) (see `rad sync status`)"
-
                    );
-
                    return Ok(());
-
                }
-
            }
-
            RepoSync::Replicas(replicas) => {
-
                let synced = synced
-
                    .filter(|s| &s.nid != profile.id())
-
                    .collect::<Vec<_>>();
-
                let connected = seeds
-
                    .connected()
-
                    .filter(|s| &s.nid != profile.id())
-
                    .collect::<Vec<_>>();
-
                if synced.len() >= connected.len() {
-
                    term::success!(
-
                        "Nothing to announce, already in sync with network (see `rad sync status`)"
-
                    );
-
                    return Ok(());
-
                }
-
                // Replicas not counting our local replica.
-
                let remotes = synced.len();
-
                if remotes >= replicas {
-
                    term::success!("Nothing to announce, already in sync with {remotes} seed(s) (see `rad sync status`)");
-
                    return Ok(());
-
                }
-
            }
+
        // All seeds.
+
        let all = node.seeds(rid)?;
+
        // Seeds in sync with us.
+
        let synced = all.iter().filter(|s| s.is_synced());
+
        // Replicas not counting our local replica.
+
        let replicas = all
+
            .iter()
+
            .filter(|s| s.is_synced() && &s.nid != profile.id())
+
            .count();
+
        // Maximum replication factor we can achieve.
+
        let max_replicas = all.iter().filter(|s| &s.nid != profile.id()).count();
+
        // If the seeds we specified in the sync settings are all synced.
+
        let is_seeds_synced = {
+
            let synced = synced.map(|s| s.nid).collect::<BTreeSet<_>>();
+
            settings.seeds.iter().all(|s| synced.contains(s))
+
        };
+
        // If we met our desired replica count. Note that this can never exceed the maximum count.
+
        let is_replicas_synced = replicas >= settings.replicas.min(max_replicas);
+

+
        // Nothing to do if we've met our sync state.
+
        if is_seeds_synced && is_replicas_synced {
+
            term::success!("Nothing to announce, already in sync with {replicas} node(s) (see `rad sync status`)");
+
            return Ok(());
        }
-
        seeds
-
            .connected()
+
        // Return nodes we can announce to.
+
        all.connected()
            .filter(|s| !s.is_synced())
            .map(|s| s.nid)
            .collect()
@@ -444,16 +470,18 @@ fn announce_refs(
    }

    let mut spinner = term::spinner(format!("Syncing with {} node(s)..", unsynced.len()));
-
    let cutoff = if let RepoSync::Replicas(replicas) = mode {
-
        Some(replicas)
-
    } else {
-
        None
-
    };
-
    let result = node.announce(rid, unsynced, timeout, |event, synced| match event {
+
    let result = node.announce(rid, unsynced, timeout, |event, replicas| match event {
        node::AnnounceEvent::Announced => ControlFlow::Continue(()),
        node::AnnounceEvent::RefsSynced { remote } => {
            spinner.message(format!("Synced with {remote}.."));
-
            if Some(synced.len()) == cutoff {
+

+
            // We're done syncing when both of these conditions are met:
+
            //
+
            // 1. We've matched or exceeded our target replica count.
+
            // 2. We've synced with the seeds specified manually.
+
            if replicas.len() >= settings.replicas
+
                && settings.seeds.iter().all(|s| replicas.contains(s))
+
            {
                ControlFlow::Break(())
            } else {
                ControlFlow::Continue(())
@@ -489,43 +517,48 @@ pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {

pub fn fetch(
    rid: RepoId,
-
    mode: RepoSync,
-
    timeout: time::Duration,
-
    node: &mut Node,
-
) -> Result<FetchResults, node::Error> {
-
    match mode {
-
        RepoSync::Seeds(seeds) => {
-
            let mut results = FetchResults::default();
-
            for seed in seeds {
-
                let result = fetch_from(rid, &seed, timeout, node)?;
-
                results.push(seed, result);
-
            }
-
            Ok(results)
-
        }
-
        RepoSync::Replicas(count) => fetch_all(rid, count, timeout, node),
-
    }
-
}
-

-
fn fetch_all(
-
    rid: RepoId,
-
    count: usize,
+
    settings: RepoSync,
    timeout: time::Duration,
    node: &mut Node,
) -> Result<FetchResults, node::Error> {
+
    let local = node.nid()?;
    // Get seeds. This consults the local routing table only.
    let seeds = node.seeds(rid)?;
+
    // Target replicas, clamped by the maximum replicas possible.
+
    let replicas = settings
+
        .replicas
+
        .min(seeds.iter().filter(|s| s.nid != local).count());
+
    let sessions = node.sessions()?;
    let mut results = FetchResults::default();
    let (connected, mut disconnected) = seeds.partition();
-
    let local = node.nid()?;
+

+
    // Fetch from specified seeds, plus our preferred seeds.
+
    for nid in &settings.seeds {
+
        if !sessions.iter().any(|s| &s.nid == nid) {
+
            term::warning("node {nid} is not connected.. skipping");
+
            continue;
+
        }
+
        let result = fetch_from(rid, nid, timeout, node)?;
+
        results.push(*nid, result);
+
    }
+
    if results.success().count() >= replicas {
+
        return Ok(results);
+
    }

    // Fetch from connected seeds.
-
    for seed in connected.iter().take(count) {
-
        let result = fetch_from(rid, &seed.nid, timeout, node)?;
-
        results.push(seed.nid, result);
+
    let connected = connected
+
        .into_iter()
+
        .filter(|c| !results.contains(&c.nid))
+
        .map(|c| c.nid)
+
        .take(replicas)
+
        .collect::<Vec<_>>();
+
    for nid in connected {
+
        let result = fetch_from(rid, &nid, timeout, node)?;
+
        results.push(nid, result);
    }

    // Try to connect to disconnected seeds and fetch from them.
-
    while results.success().count() < count {
+
    while results.success().count() < replicas {
        let Some(seed) = disconnected.pop() else {
            break;
        };
@@ -533,40 +566,56 @@ fn fetch_all(
            // Skip our own node.
            continue;
        }
-
        // Try all seed addresses until one succeeds.
-
        for ka in seed.addrs {
-
            let spinner = term::spinner(format!(
-
                "Connecting to {}@{}..",
-
                term::format::tertiary(term::format::node(&seed.nid)),
-
                &ka.addr
-
            ));
-
            let cr = node.connect(
-
                seed.nid,
-
                ka.addr,
-
                node::ConnectOptions {
-
                    persistent: false,
-
                    timeout,
-
                },
-
            )?;
-

-
            match cr {
-
                node::ConnectResult::Connected => {
-
                    spinner.finish();
-
                    let result = fetch_from(rid, &seed.nid, timeout, node)?;
-
                    results.push(seed.nid, result);
-
                    break;
-
                }
-
                node::ConnectResult::Disconnected { .. } => {
-
                    spinner.failed();
-
                    continue;
-
                }
-
            }
+
        if connect(
+
            seed.nid,
+
            seed.addrs.into_iter().map(|ka| ka.addr),
+
            timeout,
+
            node,
+
        )? {
+
            let result = fetch_from(rid, &seed.nid, timeout, node)?;
+
            results.push(seed.nid, result);
        }
    }

    Ok(results)
}

+
fn connect(
+
    nid: NodeId,
+
    addrs: impl Iterator<Item = node::Address>,
+
    timeout: time::Duration,
+
    node: &mut Node,
+
) -> Result<bool, node::Error> {
+
    // Try all addresses until one succeeds.
+
    for addr in addrs {
+
        let spinner = term::spinner(format!(
+
            "Connecting to {}@{}..",
+
            term::format::tertiary(term::format::node(&nid)),
+
            &addr
+
        ));
+
        let cr = node.connect(
+
            nid,
+
            addr,
+
            node::ConnectOptions {
+
                persistent: false,
+
                timeout,
+
            },
+
        )?;
+

+
        match cr {
+
            node::ConnectResult::Connected => {
+
                spinner.finish();
+
                return Ok(true);
+
            }
+
            node::ConnectResult::Disconnected { .. } => {
+
                spinner.failed();
+
                continue;
+
            }
+
        }
+
    }
+
    Ok(false)
+
}
+

fn fetch_from(
    rid: RepoId,
    seed: &NodeId,
modified radicle/src/identity/doc.rs
@@ -196,6 +196,11 @@ impl Visibility {
        matches!(self, Self::Public)
    }

+
    /// Check whether the visibility is private.
+
    pub fn is_private(&self) -> bool {
+
        matches!(self, Self::Private { .. })
+
    }
+

    /// Private visibility with list of allowed DIDs beyond the repository delegates.
    pub fn private(allow: impl IntoIterator<Item = Did>) -> Self {
        Self::Private {
modified radicle/src/node.rs
@@ -691,6 +691,11 @@ impl FetchResults {
        self.0.push((nid, result));
    }

+
    /// Check if the results contains the given NID.
+
    pub fn contains(&self, nid: &NodeId) -> bool {
+
        self.0.iter().any(|(n, _)| n == nid)
+
    }
+

    /// Iterate over all fetch results.
    pub fn iter(&self) -> impl Iterator<Item = (&NodeId, &FetchResult)> {
        self.0.iter().map(|(nid, r)| (nid, r))
@@ -895,9 +900,9 @@ impl Node {
        mut callback: impl FnMut(AnnounceEvent, &[PublicKey]) -> ControlFlow<()>,
    ) -> Result<AnnounceResult, Error> {
        let events = self.subscribe(timeout)?;
-
        let mut seeds = seeds.into_iter().collect::<BTreeSet<_>>();
        let refs = self.announce_refs(rid)?;

+
        let mut unsynced = seeds.into_iter().collect::<BTreeSet<_>>();
        let mut synced = Vec::new();
        let mut timeout: Vec<NodeId> = Vec::new();

@@ -909,23 +914,22 @@ impl Node {
                    remote,
                    rid: rid_,
                    at,
-
                }) if rid == rid_ => {
-
                    if seeds.remove(&remote) && refs.at == at {
-
                        synced.push(remote);
-
                        if callback(AnnounceEvent::RefsSynced { remote }, &synced).is_break() {
-
                            break;
-
                        }
+
                }) if rid == rid_ && refs.at == at => {
+
                    unsynced.remove(&remote);
+
                    synced.push(remote);
+
                    if callback(AnnounceEvent::RefsSynced { remote }, &synced).is_break() {
+
                        break;
                    }
                }
                Ok(_) => {}

                Err(Error::TimedOut) => {
-
                    timeout.extend(seeds.iter());
+
                    timeout.extend(unsynced.iter());
                    break;
                }
                Err(e) => return Err(e),
            }
-
            if seeds.is_empty() {
+
            if unsynced.is_empty() {
                break;
            }
        }