Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
cli: Improve syncing code
Merged did:key:z6MksFqX...wzpT opened 2 years ago
  • radicle: Don’t update sync status redundantly
  • cli: Correctly honor sync timeout
  • cli: Unify all syncing code

See commits for details.

16 files changed +432 -292 9f4227d3 25c71627
modified rad-patch.1.adoc
@@ -186,6 +186,9 @@ The full list of options follows:
  Whether or not to sync with the network after the patch is opened. Defaults
  to _sync_.

+
*sync.debug*::
+
  Show debug information about the syncing process.
+

*patch.draft*::
  Open the patch as a _draft_. Turned off by default.

modified radicle-cli/src/commands/clone.rs
@@ -23,6 +23,7 @@ use radicle::storage::RepositoryError;

use crate::commands::rad_checkout as checkout;
use crate::commands::rad_sync as sync;
+
use crate::node::SyncSettings;
use crate::project;
use crate::terminal as term;
use crate::terminal::args::{Args, Error, Help};
@@ -62,7 +63,7 @@ pub struct Options {
    /// The seeding scope of the repository.
    scope: Scope,
    /// Sync settings.
-
    sync: sync::RepoSync,
+
    sync: SyncSettings,
    /// Fetch timeout.
    timeout: time::Duration,
}
@@ -74,7 +75,7 @@ impl Args for Options {
        let mut parser = lexopt::Parser::from_args(args);
        let mut id: Option<RepoId> = None;
        let mut scope = Scope::All;
-
        let mut sync = sync::RepoSync::default();
+
        let mut sync = SyncSettings::default();
        let mut timeout = time::Duration::from_secs(9);
        let mut directory = None;

@@ -150,8 +151,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
        options.id,
        options.directory.clone(),
        options.scope,
-
        options.sync.with_profile(&profile),
-
        options.timeout,
+
        options.sync.with_profile(&profile).timeout(options.timeout),
        &mut node,
        &signer,
        &profile.storage,
@@ -236,8 +236,7 @@ pub fn clone<G: Signer>(
    id: RepoId,
    directory: Option<PathBuf>,
    scope: Scope,
-
    settings: sync::RepoSync,
-
    timeout: time::Duration,
+
    settings: SyncSettings,
    node: &mut Node,
    signer: &G,
    storage: &Storage,
@@ -260,7 +259,7 @@ pub fn clone<G: Signer>(
        );
    }

-
    let results = sync::fetch(id, settings, timeout, node)?;
+
    let results = sync::fetch(id, settings, node)?;
    let Ok(repository) = storage.repository(id) else {
        // If we don't have the repository locally, even after attempting to fetch,
        // there's nothing we can do.
modified radicle-cli/src/commands/issue.rs
@@ -593,7 +593,13 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {

    if announce {
        let mut node = Node::new(profile.socket());
-
        node::announce(rid, &mut node)?;
+
        node::announce(
+
            &repo,
+
            node::SyncSettings::default(),
+
            node::SyncReporting::default(),
+
            &mut node,
+
            &profile,
+
        )?;
    }

    Ok(())
modified radicle-cli/src/commands/patch.rs
@@ -847,7 +847,13 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {

    if announce {
        let mut node = Node::new(profile.socket());
-
        node::announce(rid, &mut node)?;
+
        node::announce(
+
            &repository,
+
            node::SyncSettings::default(),
+
            node::SyncReporting::default(),
+
            &mut node,
+
            &profile,
+
        )?;
    }
    Ok(())
}
modified radicle-cli/src/commands/remote/add.rs
@@ -1,5 +1,4 @@
use std::str::FromStr;
-
use std::time;

use radicle::git;
use radicle::git::RefString;
@@ -10,6 +9,7 @@ use radicle_crypto::PublicKey;
use crate::commands::rad_checkout as checkout;
use crate::commands::rad_follow as follow;
use crate::commands::rad_sync as sync;
+
use crate::node::SyncSettings;
use crate::project::SetupRemote;

pub fn run(
@@ -31,8 +31,7 @@ pub fn run(
            follow::follow(*nid, alias, &mut node, profile)?;
            sync::fetch(
                rid,
-
                sync::RepoSync::default().with_profile(profile),
-
                time::Duration::from_secs(9),
+
                SyncSettings::default().with_profile(profile),
                &mut node,
            )?;
        }
modified radicle-cli/src/commands/seed.rs
@@ -1,5 +1,4 @@
use std::ffi::OsString;
-
use std::time;

use anyhow::anyhow;

@@ -10,6 +9,7 @@ use radicle::{prelude::*, storage, Node};
use radicle_term::Element as _;

use crate::commands::rad_sync as sync;
+
use crate::node::SyncSettings;
use crate::terminal::args::{Args, Error, Help};
use crate::{project, terminal as term};

@@ -117,8 +117,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
            if fetch && node.is_running() {
                sync::fetch(
                    rid,
-
                    sync::RepoSync::default().with_profile(&profile),
-
                    time::Duration::from_secs(6),
+
                    SyncSettings::default().with_profile(&profile),
                    &mut node,
                )?;
            }
modified radicle-cli/src/commands/sync.rs
@@ -1,7 +1,6 @@
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::ffi::OsString;
-
use std::ops::ControlFlow;
use std::str::FromStr;
use std::time;

@@ -12,9 +11,11 @@ use radicle::node::AliasStore;
use radicle::node::Seed;
use radicle::node::{FetchResult, FetchResults, Handle as _, Node, SyncStatus};
use radicle::prelude::{NodeId, Profile, RepoId};
-
use radicle::storage::{ReadRepository, ReadStorage};
+
use radicle::storage::ReadStorage;
use radicle_term::Element;

+
use crate::node::SyncReporting;
+
use crate::node::SyncSettings;
use crate::terminal as term;
use crate::terminal::args::{Args, Error, Help};
use crate::terminal::format::Author;
@@ -100,7 +101,7 @@ impl FromStr for SortBy {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncMode {
    Repo {
-
        settings: RepoSync,
+
        settings: SyncSettings,
        direction: SyncDirection,
    },
    Inventory,
@@ -109,58 +110,12 @@ pub enum SyncMode {
impl Default for SyncMode {
    fn default() -> Self {
        Self::Repo {
-
            settings: RepoSync::default(),
+
            settings: SyncSettings::default(),
            direction: SyncDirection::default(),
        }
    }
}

-
/// Repository sync settings.
-
#[derive(Debug, Clone, PartialEq, Eq)]
-
pub struct RepoSync {
-
    /// Sync with at least N replicas.
-
    pub replicas: usize,
-
    /// Sync with the given list of seeds.
-
    pub seeds: BTreeSet<NodeId>,
-
}
-

-
impl RepoSync {
-
    pub fn from_seeds(seeds: impl IntoIterator<Item = NodeId>) -> Self {
-
        let seeds = BTreeSet::from_iter(seeds);
-
        Self {
-
            replicas: seeds.len(),
-
            seeds,
-
        }
-
    }
-

-
    /// Use profile to populate sync settings, by adding preferred seeds if no seeds are specified,
-
    /// and removing the local node from the set.
-
    pub fn with_profile(mut self, profile: &Profile) -> Self {
-
        // If no seeds were specified, add up to `replica` seeds from the preferred seeds.
-
        if self.seeds.is_empty() {
-
            self.seeds = profile
-
                .config
-
                .preferred_seeds
-
                .iter()
-
                .map(|p| p.id)
-
                .take(self.replicas)
-
                .collect();
-
        }
-
        // Remove our local node from the seed set just in case it was added by mistake.
-
        self.seeds.remove(profile.id());
-
        self
-
    }
-
}
-

-
impl Default for RepoSync {
-
    fn default() -> Self {
-
        Self {
-
            replicas: 3,
-
            seeds: BTreeSet::new(),
-
        }
-
    }
-
}
-

#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub enum SyncDirection {
    Fetch,
@@ -174,7 +129,6 @@ pub struct Options {
    pub rid: Option<RepoId>,
    pub debug: bool,
    pub verbose: bool,
-
    pub timeout: time::Duration,
    pub sort_by: SortBy,
    pub op: Operation,
}
@@ -266,16 +220,12 @@ impl Args for Options {
                (false, true) => SyncDirection::Announce,
            };
            let settings = if seeds.is_empty() {
-
                RepoSync {
-
                    replicas: replicas.unwrap_or(3),
-
                    seeds,
-
                }
+
                SyncSettings::from_replicas(replicas.unwrap_or(3))
            } else {
-
                RepoSync {
-
                    replicas: replicas.unwrap_or(seeds.len()),
-
                    seeds,
-
                }
-
            };
+
                SyncSettings::from_seeds(seeds)
+
            }
+
            .timeout(timeout);
+

            SyncMode::Repo {
                settings,
                direction,
@@ -287,7 +237,6 @@ impl Args for Options {
                rid,
                debug,
                verbose,
-
                timeout,
                sort_by,
                op: op.unwrap_or(Operation::Synchronize(sync)),
            },
@@ -335,7 +284,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
                if !profile.policies()?.is_seeding(&rid)? {
                    anyhow::bail!("repository {rid} is not seeded");
                }
-
                let results = fetch(rid, settings.clone(), options.timeout, &mut node)?;
+
                let results = fetch(rid, settings.clone(), &mut node)?;
                let success = results.success().count();
                let failed = results.failed().count();

@@ -348,14 +297,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
                }
            }
            if [SyncDirection::Announce, SyncDirection::Both].contains(&direction) {
-
                announce_refs(
-
                    rid,
-
                    settings,
-
                    options.timeout,
-
                    options.debug,
-
                    &mut node,
-
                    &profile,
-
                )?;
+
                announce_refs(rid, settings, options.debug, &mut node, &profile)?;
            }
        }
        Operation::Synchronize(SyncMode::Inventory) => {
@@ -436,8 +378,7 @@ fn sync_status(

fn announce_refs(
    rid: RepoId,
-
    settings: RepoSync,
-
    timeout: time::Duration,
+
    settings: SyncSettings,
    debug: bool,
    node: &mut Node,
    profile: &Profile,
@@ -447,90 +388,18 @@ fn announce_refs(
            "nothing to announce, repository {rid} is not available locally"
        ));
    };
-
    let doc = repo.identity_doc()?;
-
    let unsynced: Vec<_> = if doc.visibility.is_public() {
-
        // All seeds.
-
        let all = node.seeds(rid)?;
-
        // Seeds in sync with us.
-
        let synced = all.iter().filter(|s| s.is_synced());
-
        // Replicas not counting our local replica.
-
        let replicas = all
-
            .iter()
-
            .filter(|s| s.is_synced() && &s.nid != profile.id())
-
            .count();
-
        // Maximum replication factor we can achieve.
-
        let max_replicas = all.iter().filter(|s| &s.nid != profile.id()).count();
-
        // If the seeds we specified in the sync settings are all synced.
-
        let is_seeds_synced = {
-
            let synced = synced.map(|s| s.nid).collect::<BTreeSet<_>>();
-
            settings.seeds.iter().all(|s| synced.contains(s))
-
        };
-
        // If we met our desired replica count. Note that this can never exceed the maximum count.
-
        let is_replicas_synced = replicas >= settings.replicas.min(max_replicas);

-
        // Nothing to do if we've met our sync state.
-
        if is_seeds_synced && is_replicas_synced {
-
            term::success!("Nothing to announce, already in sync with {replicas} node(s) (see `rad sync status`)");
-
            return Ok(());
-
        }
-
        // Return nodes we can announce to.
-
        all.connected()
-
            .filter(|s| !s.is_synced())
-
            .map(|s| s.nid)
-
            .collect()
-
    } else {
-
        node.sessions()?
-
            .into_iter()
-
            .filter(|s| s.state.is_connected() && doc.is_visible_to(&s.nid))
-
            .map(|s| s.nid)
-
            .collect()
-
    };
-

-
    if unsynced.is_empty() {
-
        term::info!("Not connected to any seeds for {rid}.");
-
        return Ok(());
-
    }
+
    crate::node::announce(
+
        &repo,
+
        settings,
+
        SyncReporting {
+
            debug,
+
            ..SyncReporting::default()
+
        },
+
        node,
+
        profile,
+
    )?;

-
    let mut spinner = term::spinner(format!("Found {} seed(s)..", unsynced.len()));
-
    let result = node.announce(rid, unsynced, timeout, |event, replicas| match event {
-
        node::AnnounceEvent::Announced => ControlFlow::Continue(()),
-
        node::AnnounceEvent::RefsSynced { remote, time } => {
-
            spinner.message(format!("Synced with {remote} in {time:?}.."));
-

-
            // We're done syncing when both of these conditions are met:
-
            //
-
            // 1. We've matched or exceeded our target replica count.
-
            // 2. We've synced with the seeds specified manually.
-
            if replicas.len() >= settings.replicas
-
                && settings.seeds.iter().all(|s| replicas.contains_key(s))
-
            {
-
                ControlFlow::Break(())
-
            } else {
-
                ControlFlow::Continue(())
-
            }
-
        }
-
    })?;
-

-
    if result.synced.is_empty() {
-
        spinner.failed();
-
    } else {
-
        spinner.message(format!("Synced with {} node(s)", result.synced.len()));
-
        spinner.finish();
-
        if debug {
-
            for (seed, time) in &result.synced {
-
                term::println(
-
                    " ",
-
                    term::format::dim(format!("Synced with {seed} in {time:?}")),
-
                );
-
            }
-
        }
-
    }
-
    for seed in result.timeout {
-
        term::notice!("Seed {seed} timed out..");
-
    }
-
    if result.synced.is_empty() {
-
        anyhow::bail!("all seeds timed out");
-
    }
    Ok(())
}

@@ -546,8 +415,7 @@ pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {

pub fn fetch(
    rid: RepoId,
-
    settings: RepoSync,
-
    timeout: time::Duration,
+
    settings: SyncSettings,
    node: &mut Node,
) -> Result<FetchResults, node::Error> {
    let local = node.nid()?;
@@ -567,7 +435,7 @@ pub fn fetch(
            term::warning(format!("node {nid} is not connected.. skipping"));
            continue;
        }
-
        let result = fetch_from(rid, nid, timeout, node)?;
+
        let result = fetch_from(rid, nid, settings.timeout, node)?;
        results.push(*nid, result);
    }
    if results.success().count() >= replicas {
@@ -582,7 +450,7 @@ pub fn fetch(
        .take(replicas)
        .collect::<Vec<_>>();
    for nid in connected {
-
        let result = fetch_from(rid, &nid, timeout, node)?;
+
        let result = fetch_from(rid, &nid, settings.timeout, node)?;
        results.push(nid, result);
    }

@@ -598,10 +466,10 @@ pub fn fetch(
        if connect(
            seed.nid,
            seed.addrs.into_iter().map(|ka| ka.addr),
-
            timeout,
+
            settings.timeout,
            node,
        )? {
-
            let result = fetch_from(rid, &seed.nid, timeout, node)?;
+
            let result = fetch_from(rid, &seed.nid, settings.timeout, node)?;
            results.push(seed.nid, result);
        }
    }
modified radicle-cli/src/node.rs
@@ -1,44 +1,274 @@
+
use core::time;
+
use std::collections::BTreeSet;
+
use std::io;
+
use std::io::Write;
use std::ops::ControlFlow;
-
use std::time::Duration;

-
use radicle::identity::RepoId;
-
use radicle::node;
-
use radicle::node::Handle as _;
-
use radicle::Node;
+
use radicle::node::{self, AnnounceResult};
+
use radicle::node::{Handle as _, NodeId};
+
use radicle::storage::{ReadRepository, RepositoryError};
+
use radicle::{Node, Profile};
+
use radicle_term::format;

use crate::terminal as term;

+
/// Default time to wait for syncing to complete.
+
pub const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
+

+
/// Repository sync settings.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct SyncSettings {
+
    /// Sync with at least N replicas.
+
    pub replicas: usize,
+
    /// Sync with the given list of seeds.
+
    pub seeds: BTreeSet<NodeId>,
+
    /// How long to wait for syncing to complete.
+
    pub timeout: time::Duration,
+
}
+

+
impl SyncSettings {
+
    /// Create a [`RepoSync`] from a list of seeds.
+
    pub fn from_seeds(seeds: impl IntoIterator<Item = NodeId>) -> Self {
+
        let seeds = BTreeSet::from_iter(seeds);
+
        Self {
+
            replicas: seeds.len(),
+
            seeds,
+
            timeout: DEFAULT_SYNC_TIMEOUT,
+
        }
+
    }
+

+
    /// Create a [`RepoSync`] from a replica count.
+
    pub fn from_replicas(replicas: usize) -> Self {
+
        Self {
+
            replicas,
+
            ..Self::default()
+
        }
+
    }
+

+
    /// Set sync timeout. Defaults to [`DEFAULT_SYNC_TIMEOUT`].
+
    pub fn timeout(mut self, timeout: time::Duration) -> Self {
+
        self.timeout = timeout;
+
        self
+
    }
+

+
    /// Use profile to populate sync settings, by adding preferred seeds if no seeds are specified,
+
    /// and removing the local node from the set.
+
    pub fn with_profile(mut self, profile: &Profile) -> Self {
+
        // If no seeds were specified, add up to `replica` seeds from the preferred seeds.
+
        if self.seeds.is_empty() {
+
            self.seeds = profile
+
                .config
+
                .preferred_seeds
+
                .iter()
+
                .map(|p| p.id)
+
                .take(self.replicas)
+
                .collect();
+
        }
+
        // Remove our local node from the seed set just in case it was added by mistake.
+
        self.seeds.remove(profile.id());
+
        self
+
    }
+
}
+

+
impl Default for SyncSettings {
+
    fn default() -> Self {
+
        Self {
+
            replicas: 3,
+
            seeds: BTreeSet::new(),
+
            timeout: DEFAULT_SYNC_TIMEOUT,
+
        }
+
    }
+
}
+

+
/// Error while syncing.
+
#[derive(thiserror::Error, Debug)]
+
pub enum SyncError {
+
    #[error(transparent)]
+
    Repository(#[from] RepositoryError),
+
    #[error(transparent)]
+
    Node(#[from] radicle::node::Error),
+
    #[error("all seeds timed out")]
+
    AllSeedsTimedOut,
+
}
+

+
impl SyncError {
+
    fn is_connection_err(&self) -> bool {
+
        match self {
+
            Self::Node(e) => e.is_connection_err(),
+
            _ => false,
+
        }
+
    }
+
}
+

+
/// Writes sync output.
+
#[derive(Debug)]
+
pub enum SyncWriter {
+
    /// Write to standard out.
+
    Stdout(io::Stdout),
+
    /// Write to standard error.
+
    Stderr(io::Stderr),
+
    /// Discard output, like [`std::io::sink`].
+
    Sink,
+
}
+

+
impl Clone for SyncWriter {
+
    fn clone(&self) -> Self {
+
        match self {
+
            Self::Stdout(_) => Self::Stdout(io::stdout()),
+
            Self::Stderr(_) => Self::Stderr(io::stderr()),
+
            Self::Sink => Self::Sink,
+
        }
+
    }
+
}
+

+
impl io::Write for SyncWriter {
+
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+
        match self {
+
            Self::Stdout(stdout) => stdout.write(buf),
+
            Self::Stderr(stderr) => stderr.write(buf),
+
            Self::Sink => Ok(buf.len()),
+
        }
+
    }
+

+
    fn flush(&mut self) -> io::Result<()> {
+
        match self {
+
            Self::Stdout(stdout) => stdout.flush(),
+
            Self::Stderr(stderr) => stderr.flush(),
+
            Self::Sink => Ok(()),
+
        }
+
    }
+
}
+

+
/// Configures how sync progress is reported.
+
pub struct SyncReporting {
+
    /// Progress messages or animations.
+
    pub progress: SyncWriter,
+
    /// Completion messages.
+
    pub completion: SyncWriter,
+
    /// Debug output.
+
    pub debug: bool,
+
}
+

+
impl Default for SyncReporting {
+
    fn default() -> Self {
+
        Self {
+
            progress: SyncWriter::Stderr(io::stderr()),
+
            completion: SyncWriter::Stdout(io::stdout()),
+
            debug: false,
+
        }
+
    }
+
}
+

/// Announce changes to the network.
-
pub fn announce(rid: RepoId, node: &mut Node) -> anyhow::Result<()> {
-
    match announce_(rid, node) {
-
        Ok(()) => Ok(()),
+
pub fn announce<R: ReadRepository>(
+
    repo: &R,
+
    settings: SyncSettings,
+
    reporting: SyncReporting,
+
    node: &mut Node,
+
    profile: &Profile,
+
) -> Result<AnnounceResult, SyncError> {
+
    match announce_(repo, settings, reporting, node, profile) {
+
        Ok(result) => Ok(result),
        Err(e) if e.is_connection_err() => {
            term::hint("Node is stopped. To announce changes to the network, start it with `rad node start`.");
-
            Ok(())
+
            Ok(AnnounceResult::default())
        }
-
        Err(e) => Err(e.into()),
+
        Err(e) => Err(e),
    }
}

-
fn announce_(rid: RepoId, node: &mut Node) -> Result<(), radicle::node::Error> {
-
    let seeds = node.seeds(rid)?;
-
    let connected = seeds.connected().map(|s| s.nid).collect::<Vec<_>>();
+
fn announce_<R: ReadRepository>(
+
    repo: &R,
+
    settings: SyncSettings,
+
    mut reporting: SyncReporting,
+
    node: &mut Node,
+
    profile: &Profile,
+
) -> Result<AnnounceResult, SyncError> {
+
    let rid = repo.id();
+
    let doc = repo.identity_doc()?;
+
    let mut settings = settings.with_profile(profile);
+
    let unsynced: Vec<_> = if doc.visibility.is_public() {
+
        // All seeds.
+
        let all = node.seeds(rid)?;
+
        if all.is_empty() {
+
            term::info!(&mut reporting.completion; "No seeds found for {rid}.");
+
            return Ok(AnnounceResult::default());
+
        }
+
        // Seeds in sync with us.
+
        let synced = all
+
            .iter()
+
            .filter(|s| s.is_synced())
+
            .map(|s| s.nid)
+
            .collect::<BTreeSet<_>>();
+
        // Replicas not counting our local replica.
+
        let replicas = synced.iter().filter(|nid| *nid != profile.id()).count();
+
        // Maximum replication factor we can achieve.
+
        let max_replicas = all.iter().filter(|s| &s.nid != profile.id()).count();
+
        // If the seeds we specified in the sync settings are all synced.
+
        let is_seeds_synced = settings.seeds.iter().all(|s| synced.contains(s));
+
        // If we met our desired replica count. Note that this can never exceed the maximum count.
+
        let is_replicas_synced = replicas >= settings.replicas.min(max_replicas);

-
    if connected.is_empty() {
-
        term::info!("Not connected to any seeds.");
-
        return Ok(());
+
        // Nothing to do if we've met our sync state.
+
        if is_seeds_synced && is_replicas_synced {
+
            term::success!(
+
                &mut reporting.completion;
+
                "Nothing to announce, already in sync with {replicas} node(s) (see `rad sync status`)"
+
            );
+
            return Ok(AnnounceResult::default());
+
        }
+
        // Return nodes we can announce to. They don't have to be connected directly.
+
        all.iter()
+
            .filter(|s| !s.is_synced() && &s.nid != profile.id())
+
            .map(|s| s.nid)
+
            .collect()
+
    } else {
+
        node.sessions()?
+
            .into_iter()
+
            .filter(|s| s.state.is_connected() && doc.is_visible_to(&s.nid))
+
            .map(|s| s.nid)
+
            .collect()
+
    };
+

+
    if unsynced.is_empty() {
+
        term::info!(&mut reporting.completion; "No seeds to announce to for {rid}. (see `rad sync status`)");
+
        return Ok(AnnounceResult::default());
    }
+
    // Cap the replicas to the maximum achievable.
+
    // Nb. It's impossible to know if a replica follows our node. This means that if we announce
+
    // only our refs, and the replica doesn't follow us, it won't fetch from us.
+
    settings.replicas = settings.replicas.min(unsynced.len());

-
    let mut spinner = term::spinner(format!("Syncing with {} node(s)..", connected.len()));
+
    let mut spinner = term::spinner_to(
+
        format!("Found {} seed(s)..", unsynced.len()),
+
        reporting.completion.clone(),
+
        reporting.progress.clone(),
+
    );
    let result = node.announce(
        rid,
-
        connected,
-
        Duration::from_secs(9),
-
        |event, _| match event {
+
        unsynced,
+
        settings.timeout,
+
        |event, replicas| match event {
            node::AnnounceEvent::Announced => ControlFlow::Continue(()),
            node::AnnounceEvent::RefsSynced { remote, time } => {
-
                spinner.message(format!("Synced with {remote} in {time:?}.."));
-
                ControlFlow::Continue(())
+
                spinner.message(format!(
+
                    "Synced with {} in {}..",
+
                    format::dim(remote),
+
                    format::dim(format!("{time:?}"))
+
                ));
+

+
                // We're done syncing when both of these conditions are met:
+
                //
+
                // 1. We've matched or exceeded our target replica count.
+
                // 2. We've synced with one of the seeds specified manually.
+
                if replicas.len() >= settings.replicas
+
                    && (settings.seeds.is_empty()
+
                        || settings.seeds.iter().any(|s| replicas.contains_key(s)))
+
                {
+
                    ControlFlow::Break(())
+
                } else {
+
                    ControlFlow::Continue(())
+
                }
            }
        },
    )?;
@@ -48,6 +278,25 @@ fn announce_(rid: RepoId, node: &mut Node) -> Result<(), radicle::node::Error> {
    } else {
        spinner.message(format!("Synced with {} node(s)", result.synced.len()));
        spinner.finish();
+

+
        if reporting.debug {
+
            for (seed, time) in &result.synced {
+
                writeln!(
+
                    &mut reporting.completion,
+
                    "  {}",
+
                    term::format::dim(format!("Synced with {seed} in {time:?}")),
+
                )
+
                .ok();
+
            }
+
        }
+
    }
+
    for seed in &result.timed_out {
+
        if settings.seeds.contains(seed) {
+
            term::notice!(&mut reporting.completion; "Seed {seed} timed out..");
+
        }
+
    }
+
    if result.synced.is_empty() {
+
        return Err(SyncError::AllSeedsTimedOut);
    }
-
    Ok(())
+
    Ok(result)
}
modified radicle-cli/tests/commands.rs
@@ -332,7 +332,7 @@ fn rad_id() {

    let events = alice.handle.events();
    bob.fork(acme, bob.home.path()).unwrap();
-
    bob.announce(acme, bob.home.path()).unwrap();
+
    bob.announce(acme, 2, bob.home.path()).unwrap();
    alice.has_inventory_of(&acme, &bob.id);

    // Alice must have Bob to try add them as a delegate
@@ -540,7 +540,7 @@ fn rad_id_conflict() {
    alice.connect(&bob).converge([&bob]);

    bob.fork(acme, working.join("bob")).unwrap();
-
    bob.announce(acme, bob.home.path()).unwrap();
+
    bob.announce(acme, 2, bob.home.path()).unwrap();
    alice.has_inventory_of(&acme, &bob.id);

    formula(&environment.tmp(), "examples/rad-id-conflict.md")
@@ -982,7 +982,7 @@ fn rad_clean() {
    eve.handle.fetch(acme, alice.id, DEFAULT_TIMEOUT).unwrap();

    bob.fork(acme, bob.home.path()).unwrap();
-
    bob.announce(acme, bob.home.path()).unwrap();
+
    bob.announce(acme, 1, bob.home.path()).unwrap();
    bob.has_inventory_of(&acme, &alice.id);
    alice.has_inventory_of(&acme, &bob.id);
    eve.has_inventory_of(&acme, &alice.id);
@@ -1116,7 +1116,7 @@ fn rad_clone_all() {

    // Fork and sync repo.
    bob.fork(acme, bob.home.path()).unwrap();
-
    bob.announce(acme, bob.home.path()).unwrap();
+
    bob.announce(acme, 2, bob.home.path()).unwrap();
    bob.has_inventory_of(&acme, &alice.id);
    alice.has_inventory_of(&acme, &bob.id);

@@ -1762,13 +1762,13 @@ fn rad_remote() {
    bob.connect(&alice);
    bob.routes_to(&[(rid, alice.id)]);
    bob.fork(rid, bob.home.path()).unwrap();
-
    bob.announce(rid, bob.home.path()).unwrap();
+
    bob.announce(rid, 2, bob.home.path()).unwrap();
    alice.has_inventory_of(&rid, &bob.id);

    eve.connect(&bob);
    eve.routes_to(&[(rid, alice.id)]);
    eve.fork(rid, eve.home.path()).unwrap();
-
    eve.announce(rid, eve.home.path()).unwrap();
+
    eve.announce(rid, 2, eve.home.path()).unwrap();
    alice.has_inventory_of(&rid, &eve.id);

    test(
modified radicle-node/src/service.rs
@@ -1054,7 +1054,7 @@ where
                        self.fetch_refs_at(rid, from, refs, FETCH_TIMEOUT, channel);
                        return;
                    } else {
-
                        debug!(target: "service", "Skipping dequeued fetch for {rid}, all refs are already in local storage");
+
                        trace!(target: "service", "Skipping dequeued fetch for {rid}, all refs are already in local storage");
                    }
                }
                Err(e) => {
modified radicle-node/src/test/environment.rs
@@ -308,14 +308,23 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
    pub fn fork<P: AsRef<Path>>(&self, rid: RepoId, cwd: P) -> io::Result<()> {
        self.clone(rid, &cwd)?;
        self.rad("fork", &[rid.to_string().as_str()], &cwd)?;
-
        self.announce(rid, &cwd)?;
+
        self.announce(rid, 1, &cwd)?;

        Ok(())
    }

    /// Announce a repo.
-
    pub fn announce<P: AsRef<Path>>(&self, rid: RepoId, cwd: P) -> io::Result<()> {
-
        self.rad("sync", &[rid.to_string().as_str(), "--announce"], cwd)
+
    pub fn announce<P: AsRef<Path>>(&self, rid: RepoId, replicas: usize, cwd: P) -> io::Result<()> {
+
        self.rad(
+
            "sync",
+
            &[
+
                rid.to_string().as_str(),
+
                "--announce",
+
                "--replicas",
+
                replicas.to_string().as_str(),
+
            ],
+
            cwd,
+
        )
    }

    /// Init a repo.
modified radicle-remote-helper/src/lib.rs
@@ -77,6 +77,8 @@ pub struct Allow {
pub struct Options {
    /// Don't sync after push.
    no_sync: bool,
+
    /// Sync debugging.
+
    sync_debug: bool,
    /// Enable hints.
    hints: bool,
    /// Open patch in draft mode.
@@ -210,6 +212,7 @@ fn push_option(args: &[&str], opts: &mut Options) -> Result<(), Error> {
    match args {
        ["hints"] => opts.hints = true,
        ["sync"] => opts.no_sync = false,
+
        ["sync.debug"] => opts.sync_debug = true,
        ["no-sync"] => opts.no_sync = true,
        ["patch.draft"] => opts.draft = true,
        ["allow.rollback"] => opts.allow.rollback = true,
modified radicle-remote-helper/src/push.rs
@@ -1,10 +1,8 @@
#![allow(clippy::too_many_arguments)]
-
use std::collections::{HashMap, HashSet};
+
use std::collections::HashMap;
use std::io::IsTerminal;
-
use std::ops::ControlFlow;
use std::path::Path;
use std::str::FromStr;
-
use std::time;
use std::{assert_eq, io};

use thiserror::Error;
@@ -18,19 +16,16 @@ use radicle::explorer::ExplorerResource;
use radicle::identity::Did;
use radicle::node;
use radicle::node::{Handle, NodeId};
-
use radicle::prelude::RepoId;
use radicle::storage;
use radicle::storage::git::transport::local::Url;
use radicle::storage::{ReadRepository, SignRepository as _, WriteRepository};
use radicle::Profile;
use radicle::{git, rad};
-
use radicle_cli::terminal as cli;
+
use radicle_cli as cli;
+
use radicle_cli::terminal as term;

use crate::{hint, read_line, Options};

-
/// Default timeout for syncing to the network after a push.
-
const DEFAULT_SYNC_TIMEOUT: time::Duration = time::Duration::from_secs(9);
-

#[derive(Debug, Error)]
pub enum Error {
    /// Public key doesn't match the remote namespace we're pushing to.
@@ -89,7 +84,7 @@ pub enum Error {
    PatchCache(#[from] patch::cache::Error),
    /// Patch edit message error.
    #[error(transparent)]
-
    PatchEdit(#[from] cli::patch::Error),
+
    PatchEdit(#[from] term::patch::Error),
    /// Policy config error.
    #[error("node policy: {0}")]
    Policy(#[from] node::policy::config::Error),
@@ -329,8 +324,8 @@ pub fn run(
        if head.is_updated() {
            eprintln!(
                "{} Canonical head updated to {}",
-
                cli::format::positive("✓"),
-
                cli::format::secondary(head.new),
+
                term::format::positive("✓"),
+
                term::format::secondary(head.new),
            );
        }

@@ -342,7 +337,7 @@ pub fn run(
                let node = radicle::Node::new(profile.socket());
                if node.is_running() {
                    // Nb. allow this to fail. The push to local storage was still successful.
-
                    sync(stored.id, ok.into_values().flatten(), node, profile).ok();
+
                    sync(stored, ok.into_values().flatten(), opts, node, profile).ok();
                } else if hints {
                    hint("offline push, your node is not running");
                    hint("to sync with the network, run `rad node start`");
@@ -397,7 +392,7 @@ fn patch_open<G: Signer>(
        return Err(Error::EmptyPatch);
    }
    let (title, description) =
-
        cli::patch::get_create_message(opts.message, &stored.backend, &base, &head)?;
+
        term::patch::get_create_message(opts.message, &stored.backend, &base, &head)?;

    let patch = if opts.draft {
        patches.draft(
@@ -431,8 +426,8 @@ fn patch_open<G: Signer>(

            eprintln!(
                "{} Patch {} {action}",
-
                cli::format::positive("✓"),
-
                cli::format::tertiary(patch),
+
                term::format::positive("✓"),
+
                term::format::tertiary(patch),
            );

            // Create long-lived patch head reference, now that we know the Patch ID.
@@ -509,7 +504,7 @@ fn patch_update<G: Signer>(
    if patch.revisions().any(|(_, r)| *r.head() == commit.id()) {
        return Ok(None);
    }
-
    let message = cli::patch::get_update_message(
+
    let message = term::patch::get_update_message(
        opts.message,
        &stored.backend,
        patch.latest().1,
@@ -527,9 +522,9 @@ fn patch_update<G: Signer>(

    eprintln!(
        "{} Patch {} updated to revision {}",
-
        cli::format::positive("✓"),
-
        cli::format::tertiary(cli::format::cob(&patch_id)),
-
        cli::format::dim(revision)
+
        term::format::positive("✓"),
+
        term::format::tertiary(term::format::cob(&patch_id)),
+
        term::format::dim(revision)
    );

    // In this case, the patch was already merged via git, and pushed to storage.
@@ -641,15 +636,15 @@ fn patch_merge<C: cob::cache::Update<patch::Patch>, G: Signer>(
    if revision == latest {
        eprintln!(
            "{} Patch {} merged",
-
            cli::format::positive("✓"),
-
            cli::format::tertiary(merged.patch)
+
            term::format::positive("✓"),
+
            term::format::tertiary(merged.patch)
        );
    } else {
        eprintln!(
            "{} Patch {} merged at revision {}",
-
            cli::format::positive("✓"),
-
            cli::format::tertiary(merged.patch),
-
            cli::format::dim(cli::format::oid(revision)),
+
            term::format::positive("✓"),
+
            term::format::tertiary(merged.patch),
+
            term::format::dim(term::format::oid(revision)),
        );
    }

@@ -681,57 +676,38 @@ fn push_ref(

/// Sync with the network.
fn sync(
-
    rid: RepoId,
+
    repo: &storage::git::Repository,
    updated: impl Iterator<Item = ExplorerResource>,
+
    opts: Options,
    mut node: radicle::Node,
    profile: &Profile,
-
) -> Result<(), radicle::node::Error> {
-
    let seeds = node.seeds(rid)?;
-
    let connected = seeds.connected().map(|s| s.nid).collect::<Vec<_>>();
-
    let mut replicated = HashSet::new();
-

-
    if connected.is_empty() {
-
        eprintln!("Not connected to any seeds.");
-
        return Ok(());
-
    }
-
    let message = format!("Syncing with {} node(s)..", connected.len());
-
    let mut spinner = if io::stderr().is_terminal() {
-
        cli::spinner_to(message, io::stderr(), io::stderr())
+
) -> Result<(), cli::node::SyncError> {
+
    let progress = if io::stderr().is_terminal() {
+
        cli::node::SyncWriter::Stderr(io::stderr())
    } else {
-
        cli::spinner_to(message, io::stderr(), io::sink())
+
        cli::node::SyncWriter::Sink
    };
-
    let result = node.announce(
-
        rid,
-
        connected,
-
        DEFAULT_SYNC_TIMEOUT,
-
        |event, _| match event {
-
            node::AnnounceEvent::Announced => ControlFlow::Continue(()),
-
            node::AnnounceEvent::RefsSynced { remote, time } => {
-
                replicated.insert(remote);
-
                spinner.message(format!(
-
                    "Synced with {} in {time:?}..",
-
                    cli::format::dim(remote)
-
                ));
-
                ControlFlow::Continue(())
-
            }
+
    let result = cli::node::announce(
+
        repo,
+
        cli::node::SyncSettings::default().with_profile(profile),
+
        cli::node::SyncReporting {
+
            progress,
+
            completion: cli::node::SyncWriter::Stderr(io::stderr()),
+
            debug: opts.sync_debug,
        },
+
        &mut node,
+
        profile,
    )?;

-
    if result.synced.is_empty() {
-
        spinner.failed();
-
    } else {
-
        spinner.message(format!("Synced with {} node(s)", result.synced.len()));
-
        spinner.finish();
-
    }
    let mut urls = Vec::new();

    for seed in profile.config.preferred_seeds.iter() {
-
        if replicated.contains(&seed.id) {
+
        if result.synced(&seed.id).is_some() {
            for resource in updated {
                let url = profile
                    .config
                    .public_explorer
-
                    .url(seed.addr.host.clone(), rid)
+
                    .url(seed.addr.host.clone(), repo.id)
                    .resource(resource);

                urls.push(url);
@@ -744,7 +720,7 @@ fn sync(
    if !urls.is_empty() {
        eprintln!();
        for url in urls {
-
            eprintln!("  {}", cli::format::dim(url));
+
            eprintln!("  {}", term::format::dim(url));
        }
        eprintln!();
    }
modified radicle-term/src/io.rs
@@ -43,6 +43,9 @@ pub static CONFIG: Lazy<RenderConfig> = Lazy::new(|| RenderConfig {

#[macro_export]
macro_rules! info {
+
    ($writer:expr; $($arg:tt)*) => ({
+
        writeln!($writer, $($arg)*).ok();
+
    });
    ($($arg:tt)*) => ({
        println!("{}", format_args!($($arg)*));
    })
@@ -50,9 +53,14 @@ macro_rules! info {

#[macro_export]
macro_rules! success {
+
    // Pattern when a writer is provided.
+
    ($writer:expr; $($arg:tt)*) => ({
+
        $crate::io::success_args($writer, format_args!($($arg)*));
+
    });
+
    // Pattern without writer.
    ($($arg:tt)*) => ({
-
        $crate::io::success_args(format_args!($($arg)*));
-
    })
+
        $crate::io::success_args(&mut std::io::stdout(), format_args!($($arg)*));
+
    });
}

#[macro_export]
@@ -64,8 +72,12 @@ macro_rules! tip {

#[macro_export]
macro_rules! notice {
+
    // Pattern when a writer is provided.
+
    ($writer:expr; $($arg:tt)*) => ({
+
        $crate::io::notice_args($writer, format_args!($($arg)*));
+
    });
    ($($arg:tt)*) => ({
-
        $crate::io::notice_args(format_args!($($arg)*));
+
        $crate::io::notice_args(&mut std::io::stdout(), format_args!($($arg)*));
    })
}

@@ -74,8 +86,8 @@ pub use notice;
pub use success;
pub use tip;

-
pub fn success_args(args: fmt::Arguments) {
-
    println!("{} {args}", Paint::green("✓"));
+
pub fn success_args<W: io::Write>(w: &mut W, args: fmt::Arguments) {
+
    writeln!(w, "{} {args}", Paint::green("✓")).ok();
}

pub fn tip_args(args: fmt::Arguments) {
@@ -86,8 +98,8 @@ pub fn tip_args(args: fmt::Arguments) {
    );
}

-
pub fn notice_args(args: fmt::Arguments) {
-
    println!("{} {args}", Paint::new("!").dim());
+
pub fn notice_args<W: io::Write>(w: &mut W, args: fmt::Arguments) {
+
    writeln!(w, "{} {args}", Paint::new("!").dim()).ok();
}

pub fn columns() -> Option<usize> {
modified radicle/src/node.rs
@@ -637,14 +637,24 @@ impl From<Vec<Seed>> for Seeds {
}

/// Announcement result returned by [`Node::announce`].
-
#[derive(Debug)]
+
#[derive(Debug, Default)]
pub struct AnnounceResult {
    /// Nodes that timed out.
-
    pub timeout: Vec<NodeId>,
+
    pub timed_out: Vec<NodeId>,
    /// Nodes that synced.
    pub synced: Vec<(NodeId, time::Duration)>,
}

+
impl AnnounceResult {
+
    /// Check if a node synced successfully.
+
    pub fn synced(&self, nid: &NodeId) -> Option<time::Duration> {
+
        self.synced
+
            .iter()
+
            .find(|(id, _)| id == nid)
+
            .map(|(_, time)| *time)
+
    }
+
}
+

/// A sync event, emitted by [`Node::announce`].
#[derive(Debug)]
pub enum AnnounceEvent {
@@ -937,34 +947,34 @@ impl Node {

        let mut unsynced = seeds.into_iter().collect::<BTreeSet<_>>();
        let mut synced = HashMap::new();
-
        let mut timeout: Vec<NodeId> = Vec::new();
+
        let mut timed_out: Vec<NodeId> = Vec::new();
        let started = time::Instant::now();

        callback(AnnounceEvent::Announced, &synced);

        for e in events {
+
            let elapsed = started.elapsed();
+
            if elapsed >= timeout {
+
                timed_out.extend(unsynced.iter());
+
                break;
+
            }
            match e {
                Ok(Event::RefsSynced {
                    remote,
                    rid: rid_,
                    at,
                }) if rid == rid_ && refs.at == at => {
-
                    let elapsed = started.elapsed();
                    log::debug!(target: "radicle", "Received {e:?}");

                    unsynced.remove(&remote);
                    // We can receive synced events from nodes we didn't directly announce to,
                    // and it's possible to receive duplicates as well.
                    if synced.insert(remote, elapsed).is_none() {
-
                        if callback(
-
                            AnnounceEvent::RefsSynced {
-
                                remote,
-
                                time: elapsed,
-
                            },
-
                            &synced,
-
                        )
-
                        .is_break()
-
                        {
+
                        let event = AnnounceEvent::RefsSynced {
+
                            remote,
+
                            time: elapsed,
+
                        };
+
                        if callback(event, &synced).is_break() {
                            break;
                        }
                    }
@@ -972,7 +982,7 @@ impl Node {
                Ok(_) => {}

                Err(Error::TimedOut) => {
-
                    timeout.extend(unsynced.iter());
+
                    timed_out.extend(unsynced.iter());
                    break;
                }
                Err(e) => return Err(e),
@@ -981,8 +991,9 @@ impl Node {
                break;
            }
        }
+

        Ok(AnnounceResult {
-
            timeout,
+
            timed_out,
            synced: synced.into_iter().collect(),
        })
    }
modified radicle/src/node/seed/store.rs
@@ -59,7 +59,7 @@ impl Store for Database {
             VALUES (?1, ?2, ?3, ?4)
             ON CONFLICT DO UPDATE
             SET head = ?3, timestamp = ?4
-
             WHERE timestamp < ?4",
+
             WHERE timestamp < ?4 AND head <> ?3",
        )?;
        stmt.bind((1, rid))?;
        stmt.bind((2, nid))?;