Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
cli: Try to connect to seeds specified as options
Merged did:key:z6MksFqX...wzpT opened 1 year ago

When fetching with a --seed specified on the CLI, try to connect to it if not already connected.

11 files changed +111 -27 11a6ec5d f6aa46a2
modified radicle-cli/examples/rad-init-private-seed.md
@@ -43,6 +43,6 @@ seed succeeds.
``` ~bob
$ rad sync rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu --fetch --seed z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi --seed z6MkwPUeUS2fJMfc2HZN1RQTQcTTuhw4HhPySB8JeUg2mVvx
✓ Fetching rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu from z6MknSL…StBU8Vi..
-
✗ Fetching rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu from z6MkwPU…Ug2mVvx.. error: peer is not connected; cannot initiate fetch
+
! Warning: no addresses found for z6MkwPUeUS2fJMfc2HZN1RQTQcTTuhw4HhPySB8JeUg2mVvx, skipping..
✓ Fetched repository from 1 seed(s)
```
modified radicle-cli/examples/rad-sync.md
@@ -111,10 +111,10 @@ $ rad sync rad:z39mP9rQAaGmERfUMPULfPUi473tY
✗ Error: nothing to announce, repository rad:z39mP9rQAaGmERfUMPULfPUi473tY is not available locally
```

-
Or when trying to fetch from a seed you aren't connected to, using `--seed`:
+
Or when trying to fetch from an unknown seed, using `--seed`:
```
$ rad sync --fetch rad:z39mP9rQAaGmERfUMPULfPUi473tY --seed z6MkjM3HpqNVV4ZsL5s3RAd8ThVG3VG98YsDCjHBNnGMq5o7
-
✗ Fetching rad:z39mP9rQAaGmERfUMPULfPUi473tY from z6MkjM3…nGMq5o7.. error: peer is not connected; cannot initiate fetch
+
! Warning: no addresses found for z6MkjM3HpqNVV4ZsL5s3RAd8ThVG3VG98YsDCjHBNnGMq5o7, skipping..
✗ Error: repository fetch from 1 seed(s) failed
```

modified radicle-cli/src/commands/clone.rs
@@ -18,7 +18,6 @@ use radicle::node::{Handle as _, Node};
use radicle::prelude::*;
use radicle::rad;
use radicle::storage;
-
use radicle::storage::git::Storage;
use radicle::storage::RepositoryError;

use crate::commands::rad_checkout as checkout;
@@ -150,7 +149,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
        options.sync.with_profile(&profile),
        &mut node,
        &signer,
-
        &profile.storage,
+
        &profile,
    )?;
    let delegates = doc
        .delegates()
@@ -226,6 +225,8 @@ pub enum CloneError {
    NotFound(RepoId),
    #[error("no seeds found for {0}")]
    NoSeeds(RepoId),
+
    #[error("fetch: {0}")]
+
    Fetch(#[from] sync::FetchError),
}

pub fn clone<G: Signer>(
@@ -235,7 +236,7 @@ pub fn clone<G: Signer>(
    settings: SyncSettings,
    node: &mut Node,
    signer: &G,
-
    storage: &Storage,
+
    profile: &Profile,
) -> Result<(raw::Repository, storage::git::Repository, Doc, Project), CloneError> {
    let me = *signer.public_key();

@@ -247,8 +248,8 @@ pub fn clone<G: Signer>(
        );
    }

-
    let results = sync::fetch(id, settings, node)?;
-
    let Ok(repository) = storage.repository(id) else {
+
    let results = sync::fetch(id, settings, node, profile)?;
+
    let Ok(repository) = profile.storage.repository(id) else {
        // If we don't have the repository locally, even after attempting to fetch,
        // there's nothing we can do.
        if results.is_empty() {
@@ -282,7 +283,7 @@ pub fn clone<G: Signer>(
        "Creating checkout in ./{}..",
        term::format::tertiary(path.display())
    ));
-
    let working = rad::checkout(id, &me, path, &storage)?;
+
    let working = rad::checkout(id, &me, path, &profile.storage)?;

    spinner.finish();

modified radicle-cli/src/commands/remote/add.rs
@@ -33,6 +33,7 @@ pub fn run(
                rid,
                SyncSettings::default().with_profile(profile),
                &mut node,
+
                profile,
            )?;
        }
    }
modified radicle-cli/src/commands/seed.rs
@@ -150,6 +150,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
                        .timeout(timeout)
                        .with_profile(&profile),
                    &mut node,
+
                    &profile,
                )?;
            }
        }
modified radicle-cli/src/commands/sync.rs
@@ -8,6 +8,7 @@ use std::time;
use anyhow::{anyhow, Context as _};

use radicle::node;
+
use radicle::node::address::Store;
use radicle::node::AliasStore;
use radicle::node::Seed;
use radicle::node::{FetchResult, FetchResults, Handle as _, Node, SyncStatus};
@@ -286,7 +287,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
                if !profile.policies()?.is_seeding(&rid)? {
                    anyhow::bail!("repository {rid} is not seeded");
                }
-
                let results = fetch(rid, settings.clone(), &mut node)?;
+
                let results = fetch(rid, settings.clone(), &mut node, &profile)?;
                let success = results.success().count();
                let failed = results.failed().count();

@@ -437,21 +438,50 @@ pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {
    Ok(())
}

+
#[derive(Debug, thiserror::Error)]
+
pub enum FetchError {
+
    #[error(transparent)]
+
    Node(#[from] node::Error),
+
    #[error(transparent)]
+
    Db(#[from] node::db::Error),
+
    #[error(transparent)]
+
    Address(#[from] node::address::Error),
+
}
+

pub fn fetch(
    rid: RepoId,
    settings: SyncSettings,
    node: &mut Node,
-
) -> Result<FetchResults, node::Error> {
+
    profile: &Profile,
+
) -> Result<FetchResults, FetchError> {
    let local = node.nid()?;
-
    // Get seeds. This consults the local routing table only.
-
    let seeds = node.seeds(rid)?;
    let replicas = settings.replicas;
    let mut results = FetchResults::default();
-
    let (connected, mut disconnected) = seeds.partition();
+
    let db = profile.database()?;

-
    // Fetch from specified seeds.
+
    // Fetch from specified seeds, connecting to them if necessary.
    for nid in &settings.seeds {
-
        fetch_from(rid, nid, settings.timeout, &mut results, node)?;
+
        if node.session(*nid)?.is_some_and(|s| s.is_connected()) {
+
            fetch_from(rid, nid, settings.timeout, &mut results, node)?;
+
        } else {
+
            let addrs = db.addresses_of(nid)?;
+
            if addrs.is_empty() {
+
                results.push(
+
                    *nid,
+
                    FetchResult::Failed {
+
                        reason: format!("no addresses found in routing table for {nid}"),
+
                    },
+
                );
+
                term::warning(format!("no addresses found for {nid}, skipping.."));
+
            } else if connect(
+
                *nid,
+
                addrs.into_iter().map(|ka| ka.addr),
+
                settings.timeout,
+
                node,
+
            ) {
+
                fetch_from(rid, nid, settings.timeout, &mut results, node)?;
+
            }
+
        }
        // We are done when we either hit our replica count,
        // or fetch from all the specified seeds.
        if results.success().count() >= replicas {
@@ -466,6 +496,11 @@ pub fn fetch(
        }
    }

+
    // If we're here, we haven't met our sync targets, so consult the routing table
+
    // for more seeds to fetch from.
+
    let seeds = node.seeds(rid)?;
+
    let (connected, mut disconnected) = seeds.partition();
+

    // Fetch from connected seeds.
    let mut connected = connected
        .into_iter()
modified radicle-node/src/control.rs
@@ -131,6 +131,11 @@ where

            CommandResult::Okay(sessions).to_writer(writer)?;
        }
+
        Command::Session { nid } => {
+
            let session = handle.session(nid)?;
+

+
            CommandResult::Okay(session).to_writer(writer)?;
+
        }
        Command::Seed { rid, scope } => match handle.seed(rid, scope) {
            Ok(result) => {
                CommandResult::updated(result).to_writer(writer)?;
modified radicle-node/src/runtime/handle.rs
@@ -5,7 +5,7 @@ use std::sync::Arc;
use std::{fmt, io, time};

use crossbeam_channel as chan;
-
use radicle::node::{ConnectOptions, ConnectResult, Link, Seeds};
+
use radicle::node::{ConnectOptions, ConnectResult, Seeds};
use radicle::storage::refs::RefsAt;
use reactor::poller::popol::PopolWaker;
use serde_json::json;
@@ -276,16 +276,7 @@ impl radicle::node::Handle for Handle {
            let sessions = state
                .sessions()
                .iter()
-
                .map(|(nid, s)| radicle::node::Session {
-
                    nid: *nid,
-
                    link: if s.link.is_inbound() {
-
                        Link::Inbound
-
                    } else {
-
                        Link::Outbound
-
                    },
-
                    addr: s.addr.clone(),
-
                    state: s.state.clone(),
-
                })
+
                .map(|(_, s)| radicle::node::Session::from(s))
                .collect();
            sender.send(sessions).ok();

@@ -300,6 +291,23 @@ impl radicle::node::Handle for Handle {
        Ok(sessions)
    }

+
    fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            let session = state.sessions().get(&nid).map(radicle::node::Session::from);
+
            sender.send(session).ok();
+

+
            Ok(())
+
        });
+
        let (err_sender, err_receiver) = chan::bounded(1);
+
        self.command(service::Command::QueryState(query, err_sender))?;
+
        err_receiver.recv()??;
+

+
        let sessions = receiver.recv()?;
+

+
        Ok(sessions)
+
    }
+

    fn shutdown(self) -> Result<(), Error> {
        // If the current value is `false`, set it to `true`, otherwise error.
        if self
modified radicle-node/src/service/session.rs
@@ -144,6 +144,21 @@ impl fmt::Display for Session {
    }
}

+
impl From<&Session> for radicle::node::Session {
+
    fn from(s: &Session) -> Self {
+
        Self {
+
            nid: s.id,
+
            link: if s.link.is_inbound() {
+
                radicle::node::Link::Inbound
+
            } else {
+
                radicle::node::Link::Outbound
+
            },
+
            addr: s.addr.clone(),
+
            state: s.state.clone(),
+
        }
+
    }
+
}
+

impl Session {
    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, limits: Limits) -> Self {
        Self {
modified radicle-node/src/test/handle.rs
@@ -112,6 +112,10 @@ impl radicle::node::Handle for Handle {
        unimplemented!();
    }

+
    fn session(&self, _node: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
+
        unimplemented!()
+
    }
+

    fn shutdown(self) -> Result<(), Self::Error> {
        Ok(())
    }
modified radicle/src/node.rs
@@ -567,6 +567,9 @@ pub enum Command {
    /// Get the current peer sessions.
    Sessions,

+
    /// Get a specific peer session.
+
    Session { nid: NodeId },
+

    /// Fetch the given repository from the network.
    #[serde(rename_all = "camelCase")]
    Fetch {
@@ -1007,6 +1010,8 @@ pub trait Handle: Clone + Sync + Send {
    fn shutdown(self) -> Result<(), Self::Error>;
    /// Query the peer session state.
    fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
+
    /// Query the state of a peer session. Returns [`None`] if no session was found.
+
    fn session(&self, node: NodeId) -> Result<Option<Session>, Self::Error>;
    /// Subscribe to node events.
    fn subscribe(&self, timeout: time::Duration) -> Result<Self::Events, Self::Error>;
    /// Return debug information as a JSON value.
@@ -1322,6 +1327,15 @@ impl Handle for Node {
        Ok(sessions)
    }

+
    fn session(&self, nid: NodeId) -> Result<Option<Session>, Error> {
+
        let session = self
+
            .call::<Option<Session>>(Command::Session { nid }, DEFAULT_TIMEOUT)?
+
            .next()
+
            .ok_or(Error::EmptyResponse)??;
+

+
        Ok(session)
+
    }
+

    fn debug(&self) -> Result<json::Value, Self::Error> {
        let debug = self
            .call::<json::Value>(Command::Debug, DEFAULT_TIMEOUT)?