Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Better differentiate fetch results
Alexis Sellier committed 3 years ago
commit 4b2bc365efbbd723f33031e467d85fc36e73ad5b
parent 636d9e55eeb61ecfe712c69d3d4a313321403245
2 files changed +65 -56
modified radicle-node/src/wire/protocol.rs
@@ -33,7 +33,7 @@ use crate::wire::frame;
use crate::wire::frame::{Frame, FrameData, StreamId};
use crate::wire::Encode;
use crate::worker;
-
use crate::worker::{ChannelEvent, Fetch, Task, TaskResult};
+
use crate::worker::{ChannelEvent, FetchRequest, FetchResult, Task, TaskResult};
use crate::Link;
use crate::{address, service};

@@ -343,11 +343,11 @@ where
    fn worker_result(&mut self, task: TaskResult) {
        log::debug!(
            target: "wire",
-
            "Received fetch result from worker: stream={} remote={} fetch={:?} result={:?}",
-
            task.stream, task.fetch.remote(), task.result, task.fetch
+
            "Received fetch result from worker: stream={} remote={} result={:?}",
+
            task.stream, task.remote, task.result
        );

-
        let nid = task.fetch.remote();
+
        let nid = task.remote;
        let Some((fd, peer)) = self
            .peers
            .iter_mut()
@@ -364,8 +364,17 @@ where
        let remote = *nid;

        // Only call into the service if we initiated this fetch.
-
        if let Some((rid, namespaces)) = task.fetch.initiated() {
-
            self.service.fetched(rid, namespaces, remote, task.result);
+
        match task.result {
+
            FetchResult::Initiator {
+
                rid,
+
                namespaces,
+
                result,
+
            } => {
+
                self.service.fetched(rid, namespaces, remote, result);
+
            }
+
            FetchResult::Responder { .. } => {
+
                // We don't do anything with upload results for now.
+
            }
        }

        // Nb. It's possible that the stream would already be unregistered if we received an early
@@ -527,7 +536,7 @@ where
                                };

                                let task = Task {
-
                                    fetch: Fetch::Responder { remote: *nid },
+
                                    fetch: FetchRequest::Responder { remote: *nid },
                                    stream,
                                    channels,
                                };
@@ -812,7 +821,7 @@ where

                    let link = *link;
                    let task = Task {
-
                        fetch: Fetch::Initiator {
+
                        fetch: FetchRequest::Initiator {
                            rid,
                            namespaces,
                            remote,
modified radicle-node/src/worker.rs
@@ -9,13 +9,12 @@ use std::{env, io, net, process, thread, time};

use crossbeam_channel as chan;

-
use radicle::identity::{Id, IdentityError};
+
use radicle::identity::Id;
use radicle::prelude::NodeId;
use radicle::storage::{Namespaces, ReadRepository, RefUpdate};
use radicle::{git, Storage};

use crate::runtime::Handle;
-
use crate::storage;
use crate::wire::StreamId;
use channels::{ChannelReader, ChannelWriter};
use tunnel::Tunnel;
@@ -44,20 +43,8 @@ pub enum FetchError {
    #[error("the 'git fetch' command failed with exit code '{code}'")]
    CommandFailed { code: i32 },
    #[error(transparent)]
-
    Git(#[from] git::raw::Error),
-
    #[error(transparent)]
-
    Storage(#[from] storage::Error),
-
    #[error(transparent)]
-
    Fetch(#[from] storage::FetchError),
-
    #[error(transparent)]
    Io(#[from] io::Error),
    #[error(transparent)]
-
    Identity(#[from] IdentityError),
-
    #[error("upload failed: {0}")]
-
    Upload(#[from] UploadError),
-
    #[error("worker channel error: {0}")]
-
    Channel(#[from] chan::SendError<ChannelEvent>),
-
    #[error(transparent)]
    StagingInit(#[from] fetch::error::Init),
    #[error(transparent)]
    StagingTransition(#[from] fetch::error::Transition),
@@ -92,7 +79,7 @@ impl UploadError {

/// Fetch job sent to worker thread.
#[derive(Debug, Clone)]
-
pub enum Fetch {
+
pub enum FetchRequest {
    /// Client is initiating a fetch in order to receive the specified
    /// `refspecs` determined by [`Namespaces`].
    Initiator {
@@ -111,27 +98,35 @@ pub enum Fetch {
    },
}

-
impl Fetch {
+
impl FetchRequest {
    pub fn remote(&self) -> NodeId {
        match self {
            Self::Initiator { remote, .. } | Self::Responder { remote } => *remote,
        }
    }
+
}

-
    pub fn initiated(self) -> Option<(Id, Namespaces)> {
-
        match self {
-
            Self::Initiator {
-
                rid, namespaces, ..
-
            } => Some((rid, namespaces)),
-
            Self::Responder { .. } => None,
-
        }
-
    }
+
/// Fetch result of an upload or fetch.
+
#[derive(Debug)]
+
pub enum FetchResult {
+
    Initiator {
+
        /// Repo fetched.
+
        rid: Id,
+
        /// Namespaces fetched.
+
        namespaces: Namespaces,
+
        /// Fetch result.
+
        result: Result<Vec<RefUpdate>, FetchError>,
+
    },
+
    Responder {
+
        /// Upload result.
+
        result: Result<(), UploadError>,
+
    },
}

/// Task to be accomplished on a worker thread.
/// This is either going to be an outgoing or incoming fetch.
pub struct Task {
-
    pub fetch: Fetch,
+
    pub fetch: FetchRequest,
    pub stream: StreamId,
    pub channels: Channels,
}
@@ -139,9 +134,9 @@ pub struct Task {
/// Worker response.
#[derive(Debug)]
pub struct TaskResult {
-
    pub fetch: Fetch,
+
    pub remote: NodeId,
+
    pub result: FetchResult,
    pub stream: StreamId,
-
    pub result: Result<Vec<RefUpdate>, FetchError>,
}

/// A worker that replicates git objects.
@@ -172,14 +167,15 @@ impl Worker {
            channels,
            stream,
        } = task;
-
        let result = self._process(&fetch, stream, channels);
+
        let remote = fetch.remote();
+
        let result = self._process(fetch, stream, channels);

        log::trace!(target: "worker", "Sending response back to service..");

        if self
            .handle
            .worker_result(TaskResult {
-
                fetch,
+
                remote,
                stream,
                result,
            })
@@ -191,36 +187,41 @@ impl Worker {

    fn _process(
        &mut self,
-
        fetch: &Fetch,
+
        fetch: FetchRequest,
        stream: StreamId,
        mut channels: Channels,
-
    ) -> Result<Vec<RefUpdate>, FetchError> {
-
        match &fetch {
-
            Fetch::Initiator {
+
    ) -> FetchResult {
+
        match fetch {
+
            FetchRequest::Initiator {
                rid,
                namespaces,
                remote,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", rid);
+
                let result = self.fetch(rid, remote, stream, &namespaces, channels);

-
                self.fetch(*rid, *remote, stream, namespaces, channels)
+
                FetchResult::Initiator {
+
                    rid,
+
                    namespaces,
+
                    result,
+
                }
            }
-
            Fetch::Responder { .. } => {
+
            FetchRequest::Responder { remote } => {
                log::debug!(target: "worker", "Worker processing incoming fetch..");

                let (stream_w, stream_r) = channels.split();
                // Nb. two fetches are usually expected: one for the *special* refs,
                // followed by another for the signed refs.
-
                loop {
-
                    match self.upload_pack(fetch, stream, stream_r, stream_w) {
+
                let result = loop {
+
                    match self.upload_pack(remote, stream, stream_r, stream_w) {
                        Ok(ControlFlow::Continue(())) => continue,
-
                        Ok(ControlFlow::Break(())) => break,
-
                        Err(e) => return Err(e.into()),
+
                        Ok(ControlFlow::Break(rid)) => break Ok(rid),
+
                        Err(e) => break Err(e),
                    }
-
                }
+
                };
                log::debug!(target: "worker", "Upload process on stream {stream} exited successfully");

-
                Ok(vec![])
+
                FetchResult::Responder { result }
            }
        }
    }
@@ -280,21 +281,20 @@ impl Worker {

    fn upload_pack(
        &mut self,
-
        fetch: &Fetch,
+
        remote: NodeId,
        stream: StreamId,
        stream_r: &mut ChannelReader,
        stream_w: &mut ChannelWriter,
    ) -> Result<ControlFlow<()>, UploadError> {
-
        log::debug!(target: "worker", "Waiting for Git request pktline from {}..", fetch.remote());
+
        log::debug!(target: "worker", "Waiting for Git request pktline from {remote}..");

-
        // Read the request packet line to make sure the repository being requested matches what
-
        // we expect, and that the service requested is valid.
+
        // Read the request packet line to know what repository we're uploading.
        let (rid, request) = match pktline::Reader::new(stream_r).read_request_pktline() {
            Ok((req, pktline)) => (req.repo, pktline),
            Err(err) if err.kind() == io::ErrorKind::ConnectionReset => {
                log::debug!(
                    target: "worker",
-
                    "Upload process received stream `close` from {}", fetch.remote()
+
                    "Upload process received stream `close` from {remote}"
                );
                return Ok(ControlFlow::Break(()));
            }
@@ -304,9 +304,9 @@ impl Worker {
        };
        log::debug!(target: "worker", "Received Git request pktline for {rid}..");

-
        match self._upload_pack(rid, fetch.remote(), request, stream, stream_r, stream_w) {
+
        match self._upload_pack(rid, remote, request, stream, stream_r, stream_w) {
            Ok(()) => {
-
                log::debug!(target: "worker", "Upload of {rid} to {} exited successfully", fetch.remote());
+
                log::debug!(target: "worker", "Upload of {rid} to {remote} exited successfully");

                Ok(ControlFlow::Continue(()))
            }