Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
cli: Move `announce` function to `node` library
Alexis Sellier committed 2 years ago
commit 45a457872416281f60b1e115b08c4220cd4ec14b
parent 8e8997693718c21e577323f689bcf422ab8079f0
2 files changed +74 -36
modified radicle-cli/src/commands/sync.rs
@@ -1,12 +1,11 @@
-
use std::collections::BTreeSet;
use std::ffi::OsString;
use std::path::Path;
-
use std::{io, time};
+
use std::time;

use anyhow::{anyhow, Context as _};

use radicle::node;
-
use radicle::node::{Event, FetchResult, FetchResults, Handle as _, Node};
+
use radicle::node::{FetchResult, FetchResults, Handle as _, Node};
use radicle::prelude::{Id, NodeId, Profile};

use crate::terminal as term;
@@ -149,51 +148,31 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
}

fn announce(rid: Id, mut node: Node, timeout: time::Duration) -> anyhow::Result<()> {
-
    let events = node.subscribe(timeout)?;
    let seeds = node.seeds(rid)?;
-
    let mut seeds = seeds.connected().collect::<BTreeSet<_>>();
-

-
    if seeds.is_empty() {
+
    if !seeds.has_connections() {
        term::info!("Not connected to any seeds.");
        return Ok(());
    }
-
    node.announce_refs(rid)?;
-

-
    let mut spinner = term::spinner(format!("Syncing with {} node(s)..", seeds.len()));
-
    let mut synced = Vec::new();
-
    let mut timeout: Vec<NodeId> = Vec::new();
-

-
    for e in events {
-
        match e {
-
            Ok(Event::RefsSynced { remote, rid: rid_ }) if rid == rid_ => {
-
                seeds.remove(&remote);
-
                synced.push(remote);
-
                spinner.message(format!("Synced with {remote}.."));
-
            }
-
            Ok(_) => {}
-
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
-
                timeout.extend(seeds.into_iter());
-
                break;
-
            }
-
            Err(e) => return Err(e.into()),
-
        }
-
        if seeds.is_empty() {
-
            break;
+

+
    let connected = seeds.connected().cloned().collect::<Vec<_>>();
+
    let mut spinner = term::spinner(format!("Syncing with {} node(s)..", connected.len()));
+
    let result = node.announce(rid, connected, timeout, |event| match event {
+
        node::AnnounceEvent::Announced => {}
+
        node::AnnounceEvent::RefsSynced { remote } => {
+
            spinner.message(format!("Synced with {remote}.."));
        }
-
    }
+
    })?;

-
    if synced.is_empty() {
+
    if result.synced.is_empty() {
        spinner.failed();
    } else {
-
        spinner.message(format!("Synced with {} node(s)", synced.len()));
+
        spinner.message(format!("Synced with {} node(s)", result.synced.len()));
        spinner.finish();
    }
-

-
    for seed in timeout {
+
    for seed in result.timeout {
        term::notice!("Seed {seed} timed out..");
    }
-

-
    if synced.is_empty() {
+
    if result.synced.is_empty() {
        anyhow::bail!("all seeds timed out");
    }
    Ok(())
modified radicle/src/node.rs
@@ -243,6 +243,22 @@ impl Seeds {
    }
}

+
/// Announcement result returned by [`announce`].
+
pub struct AnnounceResult {
+
    /// Nodes that timed out.
+
    pub timeout: Vec<NodeId>,
+
    /// Nodes that synced.
+
    pub synced: Vec<NodeId>,
+
}
+

+
/// A sync event, emitted by [`announce`].
+
pub enum AnnounceEvent {
+
    /// Refs were synced with the given node.
+
    RefsSynced { remote: NodeId },
+
    /// Refs were announced to all given nodes.
+
    Announced,
+
}
+

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "kebab-case")]
pub enum FetchResult {
@@ -465,6 +481,49 @@ impl Node {
            Ok(v)
        }))
    }
+

+
    /// Announce refs of the given `rid` to the given seeds.
+
    /// Waits for the seeds to acknowledge the refs or times out if no acknowledgments are received
+
    /// within the given time.
+
    pub fn announce(
+
        &mut self,
+
        rid: Id,
+
        seeds: impl IntoIterator<Item = NodeId>,
+
        timeout: time::Duration,
+
        mut callback: impl FnMut(AnnounceEvent),
+
    ) -> Result<AnnounceResult, Error> {
+
        let events = self.subscribe(timeout)?;
+
        let mut seeds = seeds.into_iter().collect::<BTreeSet<_>>();
+

+
        self.announce_refs(rid)?;
+

+
        callback(AnnounceEvent::Announced);
+

+
        let mut synced = Vec::new();
+
        let mut timeout: Vec<NodeId> = Vec::new();
+

+
        for e in events {
+
            match e {
+
                Ok(Event::RefsSynced { remote, rid: rid_ }) if rid == rid_ => {
+
                    seeds.remove(&remote);
+
                    synced.push(remote);
+

+
                    callback(AnnounceEvent::RefsSynced { remote });
+
                }
+
                Ok(_) => {}
+

+
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+
                    timeout.extend(seeds.iter());
+
                    break;
+
                }
+
                Err(e) => return Err(e.into()),
+
            }
+
            if seeds.is_empty() {
+
                break;
+
            }
+
        }
+
        Ok(AnnounceResult { timeout, synced })
+
    }
}

// TODO(finto): repo_policies, node_policies, and routing should all