Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Replication fixes
Alexis Sellier committed 3 years ago
commit f66ecd4ece5b48ffbe4481aa905a43ad35b6a2a4
parent 803d36f0f61fb13e9ff4920f9fb82a98c17f4735
4 files changed +65 -72
modified radicle-node/src/service.rs
@@ -26,7 +26,7 @@ use crate::crypto;
use crate::crypto::{Signer, Verified};
use crate::identity::{Doc, Id};
use crate::node;
-
use crate::node::{Address, Features};
+
use crate::node::{Address, Features, FetchResult};
use crate::prelude::*;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
@@ -34,7 +34,6 @@ use crate::service::session::Protocol;
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteStorage};
use crate::storage::{Namespaces, ReadStorage};
-
use crate::worker;
use crate::worker::FetchError;
use crate::Link;

@@ -109,7 +108,7 @@ pub enum Command {
    /// Lookup seeds for the given repository in the routing table.
    Seeds(Id, chan::Sender<Vec<NodeId>>),
    /// Fetch the given repository from the network.
-
    Fetch(Id, NodeId, chan::Sender<node::FetchResult>),
+
    Fetch(Id, NodeId, chan::Sender<FetchResult>),
    /// Track the given repository.
    TrackRepo(Id, chan::Sender<bool>),
    /// Untrack the given repository.
@@ -176,7 +175,7 @@ pub struct Service<R, A, S, G> {
    /// Whether our local inventory no long represents what we have announced to the network.
    out_of_sync: bool,
    /// Fetch requests initiated by user, which are waiting for results.
-
    fetch_reqs: HashMap<Id, chan::Sender<node::FetchResult>>,
+
    fetch_reqs: HashMap<Id, chan::Sender<FetchResult>>,
    /// Current tracked repository bloom filter.
    filter: Filter,
    /// Last time the service was idle.
@@ -488,43 +487,45 @@ where
        }
    }

-
    pub fn fetched(&mut self, result: worker::FetchResult) {
-
        let remote = result.fetch.remote;
-
        let rid = result.fetch.rid;
-
        let initiated = result.fetch.initiated;
+
    pub fn fetched(&mut self, fetch: Fetch, result: Result<Vec<RefUpdate>, FetchError>) {
+
        let remote = fetch.remote;
+
        let rid = fetch.rid;
+
        let initiated = fetch.initiated;

        if initiated {
-
            log::debug!(
-
                target: "service",
-
                "Fetched {rid} {remote} (error={:?})", result.result.as_ref().err()
-
            );
-
            let result = match result.result {
+
            let result = match result {
                Ok(updated) => {
+
                    log::debug!(target: "service", "Fetched {rid} from {remote}");
+

                    self.reactor.event(Event::RefsFetched {
                        remote,
                        rid,
                        updated: updated.clone(),
                    });
-
                    Ok(updated)
+
                    FetchResult::Success { updated }
                }
                Err(err) => {
-
                    error!(target: "service", "Fetch failed for {rid} from {remote}: {err}");
+
                    let reason = err.to_string();
+
                    error!(target: "service", "Fetch failed for {rid} from {remote}: {reason}");

-
                    if let FetchError::Io(_) = err {
+
                    // For now, we only disconnect the remote in case of timeout. In the future,
+
                    // there may be other reasons to disconnect.
+
                    if err.is_timeout() {
                        self.reactor
                            .disconnect(remote, DisconnectReason::Fetch(err));
-
                        return;
-
                    } else {
-
                        Err(err)
                    }
+
                    FetchResult::Failed { reason }
                }
            };

            if let Some(results) = self.fetch_reqs.get(&rid) {
                log::debug!(target: "service", "Found existing fetch request, sending result..");

-
                if results.send(node::FetchResult::from(result)).is_err() {
+
                if results.send(result).is_err() {
                    log::error!(target: "service", "Error sending fetch result for {rid}..");
+
                    // FIXME: We should remove the channel even on success, once all seeds
+
                    // were fetched from. Otherwise an organic fetch will try to send on the
+
                    // channel.
                    self.fetch_reqs.remove(&rid);
                } else {
                    log::debug!(target: "service", "Sent fetch result for {rid}..");
modified radicle-node/src/test/simulator.rs
@@ -16,11 +16,11 @@ use crate::crypto::Signer;
use crate::git::raw as git;
use crate::prelude::Address;
use crate::service::reactor::Io;
-
use crate::service::{DisconnectReason, Event, Message, NodeId};
+
use crate::service::{DisconnectReason, Event, Fetch, Message, NodeId};
use crate::storage::{Namespaces, RefUpdate};
use crate::storage::{WriteRepository, WriteStorage};
use crate::test::peer::Service;
-
use crate::worker::{FetchError, FetchResult};
+
use crate::worker::FetchError;
use crate::Link;

/// Minimum latency between peers.
@@ -63,7 +63,7 @@ pub enum Input {
    /// Received a message from a remote peer.
    Received(NodeId, Vec<Message>),
    /// Fetch completed for a node.
-
    Fetched(Arc<FetchResult>),
+
    Fetched(Fetch, Rc<Result<Vec<RefUpdate>, FetchError>>),
    /// Used to advance the state machine after some wall time has passed.
    Wake,
}
@@ -108,11 +108,11 @@ impl fmt::Display for Scheduled {
            Input::Wake => {
                write!(f, "{}: Tock", self.node)
            }
-
            Input::Fetched(result) => {
+
            Input::Fetched(fetch, _) => {
                write!(
                    f,
-
                    "{} <~ {} ({}): FetchCompleted",
-
                    self.node, result.fetch.remote, result.fetch.rid
+
                    "{} <~ {} ({}): Fetched",
+
                    self.node, fetch.remote, fetch.rid
                )
            }
        }
@@ -411,17 +411,13 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            p.received_message(id, msg);
                        }
                    }
-
                    Input::Fetched(result) => {
-
                        let result = Arc::try_unwrap(result).unwrap();
-
                        let mut repo = p.storage().repository(result.fetch.rid).unwrap();
-

-
                        fetch(
-
                            &mut repo,
-
                            &result.fetch.remote,
-
                            result.fetch.namespaces.clone(),
-
                        )
-
                        .unwrap();
-
                        p.fetched(result);
+
                    Input::Fetched(f, result) => {
+
                        let result = Rc::try_unwrap(result).unwrap();
+
                        let mut repo = p.storage().repository(f.rid).unwrap();
+

+
                        fetch(&mut repo, &f.remote, f.namespaces.clone()).unwrap();
+

+
                        p.fetched(f, result);
                    }
                }
                for o in p.by_ref() {
@@ -621,10 +617,10 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        Scheduled {
                            node,
                            remote: fetch.remote,
-
                            input: Input::Fetched(Arc::new(FetchResult {
+
                            input: Input::Fetched(
                                fetch,
-
                                result: Err(FetchError::Io(io::ErrorKind::Other.into())),
-
                            })),
+
                                Rc::new(Err(FetchError::Io(io::ErrorKind::Other.into()))),
+
                            ),
                        },
                    );
                } else {
@@ -633,10 +629,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        Scheduled {
                            node,
                            remote: fetch.remote,
-
                            input: Input::Fetched(Arc::new(FetchResult {
-
                                fetch,
-
                                result: Ok(vec![]),
-
                            })),
+
                            input: Input::Fetched(fetch, Rc::new(Ok(vec![]))),
                        },
                    );
                }
modified radicle-node/src/wire/protocol.rs
@@ -354,7 +354,7 @@ where
        peer.downgrade();

        self.actions.push_back(Action::RegisterTransport(session));
-
        self.service.fetched(task.result);
+
        self.service.fetched(task.fetch, task.result);
    }
}

modified radicle-node/src/worker.rs
@@ -1,5 +1,4 @@
use std::io::{prelude::*, BufReader};
-
use std::ops::Deref;
use std::thread::JoinHandle;
use std::{env, io, net, process, thread, time};

@@ -35,21 +34,6 @@ pub struct Config {
    pub storage: Storage,
}

-
/// Result of a fetch request from a specific seed.
-
#[derive(Debug)]
-
pub struct FetchResult {
-
    pub fetch: Fetch,
-
    pub result: Result<Vec<RefUpdate>, FetchError>,
-
}
-

-
impl Deref for FetchResult {
-
    type Target = Result<Vec<RefUpdate>, FetchError>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.result
-
    }
-
}
-

/// Error returned by fetch.
#[derive(thiserror::Error, Debug)]
pub enum FetchError {
@@ -65,6 +49,13 @@ pub enum FetchError {
    Project(#[from] storage::ProjectError),
}

+
impl FetchError {
+
    /// Check if it's a timeout error.
+
    pub fn is_timeout(&self) -> bool {
+
        matches!(self, FetchError::Io(e) if e.kind() == io::ErrorKind::TimedOut)
+
    }
+
}
+

/// Task to be accomplished on a worker thread.
/// This is either going to be an outgoing or incoming fetch.
pub struct Task<G: Signer + EcSign> {
@@ -75,7 +66,8 @@ pub struct Task<G: Signer + EcSign> {

/// Worker response.
pub struct TaskResult<G: Signer + EcSign> {
-
    pub result: FetchResult,
+
    pub fetch: Fetch,
+
    pub result: Result<Vec<RefUpdate>, FetchError>,
    pub session: WireSession<G>,
}

@@ -108,12 +100,15 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        } = task;

        let (session, result) = self._process(&fetch, drain, session);
-
        let result = FetchResult { fetch, result };
        log::debug!(target: "worker", "Sending response back to service..");

        if self
            .handle
-
            .worker_result(TaskResult { result, session })
+
            .worker_result(TaskResult {
+
                fetch,
+
                result,
+
                session,
+
            })
            .is_err()
        {
            log::error!(target: "worker", "Unable to report fetch result: worker channel disconnected");
@@ -136,11 +131,12 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
            let result = self.fetch(fetch, &mut tunnel);
            let mut session = tunnel.into_session();

-
            if let Err(err) = pktline::done(&mut session) {
-
                log::error!(target: "worker", "Fetch error: {err}");
-
            }
+
            // If there are no errors, send a `done` special packet. We don't send this on error,
+
            // as the remote will not be expecting it.
            if let Err(err) = &result {
                log::error!(target: "worker", "Fetch error: {err}");
+
            } else if let Err(err) = pktline::done(&mut session) {
+
                log::error!(target: "worker", "Fetch error: {err}");
            }
            (session, result)
        } else {
@@ -237,12 +233,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        stream_r: &mut WireReader,
        stream_w: &mut WireWriter<G>,
    ) -> Result<Vec<RefUpdate>, FetchError> {
-
        // Connect to our local git daemon, running as a child process.
-
        let daemon = net::TcpStream::connect_timeout(&self.daemon, self.timeout)?;
-
        let (mut daemon_r, mut daemon_w) = (daemon.try_clone()?, daemon);
        let mut stream_r = pktline::Reader::new(drain, stream_r);
-
        let mut daemon_r = pktline::Reader::new(vec![], &mut daemon_r);
-
        let mut buffer = [0; u16::MAX as usize + 1];

        // Read the request packet line to make sure the repository being requested matches what
        // we expect, and that the service requested is valid.
@@ -265,11 +256,19 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
                ))));
            }
        };
+

+
        // Connect to our local git daemon, running as a child process.
+
        let daemon = net::TcpStream::connect_timeout(&self.daemon, self.timeout)?;
+
        let (mut daemon_r, mut daemon_w) = (daemon.try_clone()?, daemon);
+
        let mut daemon_r = pktline::Reader::new(vec![], &mut daemon_r);
+

        // Write the raw request to the daemon, once we've verified it.
        daemon_w.write_all(&request)?;

        // We now loop, alternating between reading requests from the client, and writing responses
        // back from the daemon.. Requests are delimited with a flush packet (`flush-pkt`).
+
        let mut buffer = [0; u16::MAX as usize + 1];
+

        loop {
            if let Err(e) = daemon_r.read_pktlines(stream_w, &mut buffer) {
                // This is the expected error when the remote disconnects.