Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement 'fetch' command properly
Alexis Sellier committed 3 years ago
commit ef2d09114ab602cce8d86f7f1dfb9658c8938270
parent 7be6fe341459b1a0d666e6086ae83d083173bba8
1 file changed +89 -17
modified node/src/protocol.rs
@@ -6,12 +6,14 @@ pub mod peer;
use std::ops::{Deref, DerefMut};
use std::{collections::VecDeque, fmt, io, net, net::IpAddr};

+
use crossbeam_channel as chan;
use fastrand::Rng;
use git_url::Url;
use log::*;
use nakamoto::{LocalDuration, LocalTime};
use nakamoto_net as nakamoto;
use nakamoto_net::{Io, Link};
+
use nonempty::NonEmpty;

use crate::address_book;
use crate::address_book::AddressBook;
@@ -49,11 +51,40 @@ pub type Timestamp = u64;
#[derive(Debug, Clone)]
pub enum Event {}

+
/// Error returned by [`Command::Fetch`].
+
#[derive(thiserror::Error, Debug)]
+
pub enum FetchError {
+
    #[error(transparent)]
+
    Git(#[from] git2::Error),
+
}
+

+
/// Result of looking up providers in our routing table.
+
#[derive(Debug)]
+
pub enum FetchLookup {
+
    Found {
+
        providers: NonEmpty<net::SocketAddr>,
+
        results: chan::Receiver<FetchResult>,
+
    },
+
    NotFound,
+
}
+

+
/// Result of a fetch request from a specific provider.
+
#[derive(Debug)]
+
pub enum FetchResult {
+
    /// Successful fetch from a provider.
+
    Fetched { from: net::SocketAddr },
+
    /// Error fetching the resource from a provider.
+
    Error {
+
        from: net::SocketAddr,
+
        error: FetchError,
+
    },
+
}
+

/// Commands sent to the protocol by the operator.
#[derive(Debug)]
pub enum Command {
    Connect(net::SocketAddr),
-
    Fetch(ProjId, net::SocketAddr),
+
    Fetch(ProjId, chan::Sender<FetchLookup>),
    AnnounceRefsUpdate(ProjId),
}

@@ -110,9 +141,13 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Protoco
        }
    }

-
    pub fn providers(&self, proj: &ProjId) -> Box<dyn Iterator<Item = &Peer> + '_> {
+
    pub fn providers(&self, proj: &ProjId) -> Box<dyn Iterator<Item = (&NodeId, &Peer)> + '_> {
        if let Some(peers) = self.routing.get(proj) {
-
            Box::new(peers.iter().filter_map(|id| self.peers.by_id(id)))
+
            Box::new(
+
                peers
+
                    .iter()
+
                    .filter_map(|id| self.peers.by_id(id).map(|p| (id, p))),
+
            )
        } else {
            Box::new(std::iter::empty())
        }
@@ -219,7 +254,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Protoco
    fn get_inventories(&mut self) -> Result<(), storage::Error> {
        let mut msgs = Vec::new();
        for proj in self.tracked()? {
-
            for peer in self.providers(&proj) {
+
            for (_, peer) in self.providers(&proj) {
                if peer.is_negotiated() {
                    msgs.push((
                        peer.addr,
@@ -321,19 +356,56 @@ where

        match cmd {
            Command::Connect(addr) => self.context.connect(addr),
-
            Command::Fetch(proj, remote) => {
-
                self.storage
-
                    .repository(&proj)
-
                    .unwrap()
-
                    .fetch(&Url {
-
                        scheme: git_url::Scheme::Git,
-
                        host: Some(remote.ip().to_string()),
-
                        port: Some(remote.port()),
-
                        // TODO: Fix upstream crate so that it adds a `/` when needed.
-
                        path: format!("/{}", proj).into(),
-
                        ..Url::default()
-
                    })
-
                    .unwrap();
+
            Command::Fetch(proj, resp) => {
+
                let providers = self.providers(&proj).collect::<Vec<_>>();
+

+
                if let Some(providers) = NonEmpty::from_vec(providers) {
+
                    log::debug!("Found {} providers for {}", providers.len(), proj);
+

+
                    let (results_, results) = chan::bounded(providers.len());
+
                    {
+
                        let (_, head) = &providers.head;
+
                        let tail = providers
+
                            .tail()
+
                            .iter()
+
                            .map(|(_, peer)| peer.addr)
+
                            .collect::<Vec<_>>();
+

+
                        resp.send(FetchLookup::Found {
+
                            providers: NonEmpty::from((head.addr, tail)),
+
                            results,
+
                        })
+
                        .ok();
+
                    }
+

+
                    let mut repo = self.storage.repository(&proj).unwrap();
+
                    // TODO: Limit the number of providers we fetch from? Randomize?
+
                    for (_, peer) in providers {
+
                        match repo.fetch(&Url {
+
                            scheme: git_url::Scheme::Git,
+
                            host: Some(peer.addr.ip().to_string()),
+
                            port: Some(peer.addr.port()),
+
                            // TODO: Fix upstream crate so that it adds a `/` when needed.
+
                            path: format!("/{}", proj).into(),
+
                            ..Url::default()
+
                        }) {
+
                            Ok(()) => {
+
                                results_.send(FetchResult::Fetched { from: peer.addr }).ok();
+
                            }
+
                            Err(err) => {
+
                                results_
+
                                    .send(FetchResult::Error {
+
                                        from: peer.addr,
+
                                        error: err.into(),
+
                                    })
+
                                    .ok();
+
                            }
+
                        }
+
                    }
+
                } else {
+
                    log::error!("No providers found for {}", proj);
+
                    resp.send(FetchLookup::NotFound).ok();
+
                }
            }
            Command::AnnounceRefsUpdate(proj) => {
                let user = *self.storage.user_id();