Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
fetch: improve the fetch protocol
Merged fintohaps opened 2 years ago

Logging the stages of the fetch protocol is added to get some insight of how far a fetch gets, while also providing some simple timings using Instant::elapsed.

This allowed me to run a local client version of this fetch protocol and see what was happening. The 9s timeout was hit for the channel methods. So this was increased to 30s.

From there, I was able to see that a rad clone of an offending repository was actually “finishing” but the final error would say that the other side had disconnected. This lead to the realisation that the call to done can be a fire and forget – log the error and continue on with validation and refdb updates. I was then able to confirm that the clone of the offending repository was able to be performed.

8 files changed +83 -25 87d1cb50 b1435629
modified radicle-fetch/src/lib.rs
@@ -9,12 +9,15 @@ mod refs;
mod stage;
mod state;

+
use gix_protocol::handshake;
pub use handle::Handle;
pub use policy::{Allowed, BlockList, Scope};
+
use radicle::storage::ReadRepository as _;
pub use state::{FetchLimit, FetchResult};
pub use transport::Transport;

use std::io;
+
use std::time::Instant;

use radicle::crypto::PublicKey;
use radicle::storage::refs::RefsAt;
@@ -55,21 +58,27 @@ pub fn pull<S>(
where
    S: transport::ConnectionStream,
{
+
    let start = Instant::now();
    let local = *handle.local();
    if local == remote {
        return Err(Error::ReplicateSelf);
    }
-
    let handshake = handle
-
        .transport
-
        .handshake()
-
        .map_err(|err| Error::Handshake { err })?;
+
    let handshake = perform_handshake(handle)?;
    let state = FetchState::default();

    // N.b. ensure that we ignore the local peer's key.
    handle.blocked.extend([local]);
-
    state
+
    let result = state
        .run(handle, &handshake, limit, remote, refs_at)
-
        .map_err(Error::Protocol)
+
        .map_err(Error::Protocol);
+

+
    log::debug!(
+
        target: "fetch",
+
        "finished pull of {} ({}ms)",
+
        handle.repo.id(),
+
        start.elapsed().as_millis()
+
    );
+
    result
}

/// Clone changes from the `remote`.
@@ -84,15 +93,31 @@ pub fn clone<S>(
where
    S: transport::ConnectionStream,
{
+
    let start = Instant::now();
    if *handle.local() == remote {
        return Err(Error::ReplicateSelf);
    }
-
    let handshake = handle
-
        .transport
-
        .handshake()
-
        .map_err(|err| Error::Handshake { err })?;
+
    let handshake = perform_handshake(handle)?;
    let state = FetchState::default();
-
    state
+
    let result = state
        .run(handle, &handshake, limit, remote, None)
-
        .map_err(Error::Protocol)
+
        .map_err(Error::Protocol);
+

+
    log::debug!(
+
        target: "fetch",
+
        "finished clone of {} ({}ms)",
+
        handle.repo.id(),
+
        start.elapsed().as_millis(),
+
    );
+
    result
+
}
+

+
fn perform_handshake<S>(handle: &mut Handle<S>) -> Result<handshake::Outcome, Error>
+
where
+
    S: transport::ConnectionStream,
+
{
+
    handle.transport.handshake().map_err(|err| {
+
        log::warn!(target: "fetch", "failed to perform handshake: {err}");
+
        Error::Handshake { err }
+
    })
}
modified radicle-fetch/src/state.rs
@@ -1,4 +1,5 @@
use std::collections::{BTreeMap, BTreeSet};
+
use std::time::Instant;

use gix_protocol::handshake;
use radicle::crypto::PublicKey;
@@ -378,6 +379,7 @@ impl FetchState {
    where
        S: transport::ConnectionStream,
    {
+
        let start = Instant::now();
        // N.b. we always fetch the `rad/id` since our delegate set
        // might be further ahead than theirs, e.g. we are the
        // deciding vote on adding a delegate.
@@ -389,6 +391,7 @@ impl FetchState {
                limit: limit.special,
            },
        )?;
+
        log::debug!(target: "fetch", "fetched rad/id ({}ms)", start.elapsed().as_millis());

        // N.b. The error case here should not happen. In the case of
        // a `clone` we have asked for refs/rad/id and ensured it was
@@ -418,6 +421,12 @@ impl FetchState {
            remote,
            refs_at,
        )?;
+
        log::debug!(
+
            target: "fetch",
+
            "fetched rad/sigrefs for {} remotes ({}ms)",
+
            signed_refs.len(),
+
            start.elapsed().as_millis()
+
        );

        let data_refs = stage::DataRefs {
            remote,
@@ -425,6 +434,22 @@ impl FetchState {
            limit: limit.refs,
        };
        self.run_stage(handle, handshake, &data_refs)?;
+
        log::debug!(
+
            target: "fetch",
+
            "fetched data refs for {} remotes ({}ms)",
+
            data_refs.remotes.len(),
+
            start.elapsed().as_millis()
+
        );
+

+
        // N.b. signal to exit the upload-pack sequence
+
        // We're finished fetching on this side, and all that's left
+
        // is validation.
+
        match handle.transport.done() {
+
            Ok(()) => log::debug!(target: "fetch", "sent done signal to remote"),
+
            Err(err) => {
+
                log::warn!(target: "fetch", "attempted to send done to remote {remote}: {err}")
+
            }
+
        }

        // Run validation of signed refs, pruning any offending
        // remotes from the tips, thus not updating the production Git
@@ -528,9 +553,12 @@ impl FetchState {
                }
            }
        }
-

-
        // N.b. signal to exit the upload-pack sequence
-
        handle.transport.done()?;
+
        log::debug!(
+
            target: "fetch",
+
            "validated {} remotes ({}ms)",
+
            remotes.len(),
+
            start.elapsed().as_millis()
+
        );

        // N.b. only apply to Git repository if no delegates have failed verification.
        if failures.is_empty() {
@@ -541,12 +569,20 @@ impl FetchState {
                    .into_values()
                    .flat_map(|ups| ups.into_iter()),
            )?;
+
            log::debug!(target: "fetch", "applied updates ({}ms)", start.elapsed().as_millis());
            Ok(FetchResult::Success {
                applied,
                remotes,
                warnings,
            })
        } else {
+
            log::debug!(
+
                target: "fetch",
+
                "fetch failed {} warnings and {} failures ({}ms)",
+
                warnings.len(),
+
                failures.len(),
+
                start.elapsed().as_millis()
+
            );
            Ok(FetchResult::Failed { warnings, failures })
        }
    }
modified radicle-node/src/runtime.rs
@@ -4,7 +4,7 @@ pub mod thread;
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
-
use std::{fs, io, net, time};
+
use std::{fs, io, net};

use crossbeam_channel as chan;
use cyphernet::Ecdh;
@@ -262,7 +262,6 @@ impl Runtime {
            cobs_cache,
            worker::Config {
                capacity: 8,
-
                timeout: time::Duration::from_secs(9),
                storage: storage.clone(),
                fetch,
                policy,
modified radicle-node/src/service.rs
@@ -1268,7 +1268,7 @@ where
            match self.db.addresses().get(announcer) {
                Ok(node) => {
                    if node.is_none() {
-
                        debug!(target: "service", "Ignoring announcement from unknown node {announcer}");
+
                        trace!(target: "service", "Ignoring announcement from unknown node {announcer}");
                        return Ok(false);
                    }
                }
@@ -1600,7 +1600,7 @@ where
            trace!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
            return Ok(());
        }
-
        message.log(log::Level::Debug, remote, Link::Inbound);
+
        message.log(log::Level::Trace, remote, Link::Inbound);

        trace!(target: "service", "Received message {:?} from {}", &message, peer.id);

modified radicle-node/src/service/io.rs
@@ -57,7 +57,7 @@ impl Outbox {
    }

    pub fn write(&mut self, remote: &Session, msg: Message) {
-
        msg.log(log::Level::Debug, &remote.id, Link::Outbound);
+
        msg.log(log::Level::Trace, &remote.id, Link::Outbound);
        trace!(target: "service", "Write {:?} to {}", &msg, remote);

        self.io.push_back(Io::Write(remote.id, vec![msg]));
@@ -101,7 +101,7 @@ impl Outbox {
                ix + 1,
                msgs.len()
            );
-
            msg.log(log::Level::Debug, &remote.id, Link::Outbound);
+
            msg.log(log::Level::Trace, &remote.id, Link::Outbound);
        }
        self.io.push_back(Io::Write(remote.id, msgs));
    }
modified radicle-node/src/wire/protocol.rs
@@ -44,7 +44,7 @@ pub const NOISE_XK: HandshakePattern = HandshakePattern {

/// Default time to wait to receive something from a worker channel. Applies to
/// workers waiting for data from remotes as well.
-
pub const DEFAULT_CHANNEL_TIMEOUT: time::Duration = time::Duration::from_secs(9);
+
pub const DEFAULT_CHANNEL_TIMEOUT: time::Duration = time::Duration::from_secs(30);

/// Default time to wait until a network connection is considered inactive.
pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(30);
modified radicle-node/src/worker.rs
@@ -29,8 +29,6 @@ pub use channels::{ChannelEvent, Channels};
pub struct Config {
    /// Number of worker threads.
    pub capacity: usize,
-
    /// Timeout for all operations.
-
    pub timeout: time::Duration,
    /// Git storage.
    pub storage: Storage,
    /// Configuration for performing fetched.
modified radicle/src/node.rs
@@ -46,7 +46,7 @@ pub const DEFAULT_SOCKET_NAME: &str = "control.sock";
/// Default radicle protocol port.
pub const DEFAULT_PORT: u16 = 8776;
/// Default timeout when waiting for the node to respond with data.
-
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(9);
+
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(30);
/// Maximum length in bytes of a node alias.
pub const MAX_ALIAS_LENGTH: usize = 32;
/// Penalty threshold at which point we avoid connecting to this node.