Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Refactor fetch response code
Alexis Sellier committed 3 years ago
commit ec2e7d7ee167776ca8d50b873b0da95ee6a68d28
parent e6f56ff3bc89366c3801b27cfc49bce75dc986c6
5 files changed +72 -75
modified radicle-node/src/control.rs
@@ -13,7 +13,6 @@ use crate::client;
use crate::identity::Id;
use crate::node;
use crate::service::FetchLookup;
-
use crate::service::FetchResult;

#[derive(thiserror::Error, Debug)]
pub enum Error {
@@ -242,19 +241,19 @@ fn fetch<W: Write, H: Handle<Error = client::handle::Error, FetchLookup = FetchL
            )?;

            for result in results.iter() {
-
                match result {
-
                    FetchResult::Fetched { from, updated } => {
-
                        writeln!(writer, "ok: {} fetched from {}", &id, from)?;
+
                match result.result {
+
                    Ok(updated) => {
+
                        writeln!(writer, "ok: {} fetched from {}", &id, result.remote)?;

                        for update in updated {
                            writeln!(writer, "{}", update)?;
                        }
                    }
-
                    FetchResult::Error { from, error } => {
+
                    Err(err) => {
                        writeln!(
                            writer,
                            "error: {} failed to fetch from {}: {}",
-
                            &id, from, error
+
                            &id, result.remote, err
                        )?;
                    }
                }
modified radicle-node/src/service.rs
@@ -131,24 +131,10 @@ pub enum FetchLookup {
/// Result of a fetch request from a specific seed.
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
-
pub enum FetchResult {
-
    /// Successful fetch from a seed.
-
    Fetched {
-
        from: NodeId,
-
        updated: Vec<RefUpdate>,
-
    },
-
    /// Error fetching the resource from a seed.
-
    Error { from: NodeId, error: FetchError },
-
}
-

-
impl FetchResult {
-
    /// Get the remote node id.
-
    pub fn remote(&self) -> &NodeId {
-
        match self {
-
            Self::Fetched { from, .. } => from,
-
            Self::Error { from, .. } => from,
-
        }
-
    }
+
pub struct FetchResult {
+
    pub rid: Id,
+
    pub remote: NodeId,
+
    pub result: Result<Vec<RefUpdate>, FetchError>,
}

/// Function used to query internal service state.
@@ -226,6 +212,8 @@ pub struct Service<R, A, S, G> {
    rng: Rng,
    /// Whether our local inventory no long represents what we have announced to the network.
    out_of_sync: bool,
+
    /// Fetch requests initiated by user, which are waiting for results.
+
    fetch_reqs: HashMap<Id, chan::Sender<FetchResult>>,
    /// Current tracked repository bloom filter.
    filter: Filter,
    /// Last time the service was idle.
@@ -289,6 +277,7 @@ where
            reactor: Reactor::default(),
            sessions,
            out_of_sync: false,
+
            fetch_reqs: HashMap::new(),
            filter: Filter::empty(),
            last_idle: LocalTime::default(),
            last_sync: LocalTime::default(),
@@ -456,8 +445,14 @@ where
                    return;
                }

-
                let Ok(seeds) = self.routing.get(&id) else {
-
                    todo!();
+
                let seeds = match self.routing.get(&id) {
+
                    Ok(seeds) => seeds,
+
                    Err(err) => {
+
                        log::error!("Error reading routing table for {id}: {err}");
+
                        resp.send(FetchLookup::NotFound).ok();
+

+
                        return;
+
                    }
                };
                let Some(seeds) = NonEmpty::from_vec(seeds.into_iter().collect()) else {
                    log::warn!("No seeds found for {}", id);
@@ -474,17 +469,12 @@ where
                })
                .ok();

+
                self.fetch_reqs.insert(id, results_send);
+

                // TODO: Limit the number of seeds we fetch from? Randomize?
                for seed in seeds {
-
                    let session = self.sessions.get_mut(&seed).unwrap();
-
                    if let Some(fetch) = session.fetch(id, results_send.clone()) {
-
                        self.reactor.write(session.id, fetch);
-
                        self.reactor
-
                            .fetch(session.id, id, Namespaces::default(), true);
-
                    } else {
-
                        // TODO: If we can't fetch, it's because we're already fetching from
-
                        // this peer. So we need to queue the request, or find another peer.
-
                        todo!();
+
                    if let Err(err) = self.fetch(id, seed) {
+
                        log::error!("Error initiating fetch for {id} from {seed}: {err}");
                    }
                }
            }
@@ -525,18 +515,38 @@ where
        }
    }

+
    pub fn fetch(&mut self, rid: Id, seed: NodeId) -> Result<(), Error> {
+
        let session = self.sessions.get_mut(&seed).unwrap();
+
        if let Some(fetch) = session.fetch(rid) {
+
            self.reactor.write(session.id, fetch);
+
            self.reactor
+
                .fetch(session.id, rid, Namespaces::default(), true);
+
        } else {
+
            // TODO: If we can't fetch, it's because we're already fetching from
+
            // this peer. So we need to queue the request, or find another peer.
+
            todo!();
+
        }
+
        Ok(())
+
    }
+

    pub fn repo_fetched(&mut self, result: FetchResult) {
-
        // TODO(cloudhead): handle completed job with service business logic
-
        // TODO: Downgrade session to gossip protocol.
-
        if let Some(session) = self.sessions.get_mut(result.remote()) {
-
            if let session::State::Connected { protocol, .. } = &session.state {
-
                if let session::Protocol::Fetch {
-
                    results: Some(results),
-
                } = protocol
-
                {
-
                    results.send(result).unwrap();
+
        let remote = result.remote;
+
        let rid = result.rid;
+

+
        if let Some(results) = self.fetch_reqs.get(&rid) {
+
            if results.send(result).is_err() {
+
                self.fetch_reqs.remove(&rid);
+
            }
+
        }
+
        if let Some(session) = self.sessions.get_mut(&remote) {
+
            if let session::State::Connected { protocol, .. } = &mut session.state {
+
                if *protocol == session::Protocol::Fetch {
+
                    *protocol = session::Protocol::Gossip;
                } else {
-
                    // Fetch initiated by remote, we don't need to report back.
+
                    panic!(
+
                        "Unexpected session state for {}: expected 'fetch' protocol, got 'gossip'",
+
                        session.id
+
                    );
                }
            }
        }
@@ -924,7 +934,7 @@ where
                }
            }
            (session::State::Connected { protocol, .. }, Message::Fetch { repo }) => {
-
                *protocol = Protocol::Fetch { results: None };
+
                *protocol = Protocol::Fetch;
                // Instruct the transport to handover the socket to the worker.
                self.reactor
                    .fetch(*remote, repo, Namespaces::default(), false);
modified radicle-node/src/service/session.rs
@@ -1,7 +1,6 @@
-
use crate::service::chan;
use crate::service::message;
use crate::service::message::Message;
-
use crate::service::{storage, FetchResult};
+
use crate::service::storage;
use crate::service::{Id, LocalTime, NodeId, Reactor, Rng};
use crate::Link;

@@ -17,7 +16,7 @@ pub enum PingState {
}

/// Session protocol.
-
#[derive(Debug, Default, Clone)]
+
#[derive(Debug, Default, Copy, PartialEq, Eq, Clone)]
pub enum Protocol {
    /// The default message-based gossip protocol.
    #[default]
@@ -25,12 +24,7 @@ pub enum Protocol {
    /// Git smart protocol. Used for fetching repository data.
    /// This protocol is used after a connection upgrade via the
    /// [`Message::Fetch`] message.
-
    Fetch {
-
        /// Channel to send fetch results on. Set to `Some` when the fetch
-
        /// is locally initiated. Otherwise, no results need to be communicated
-
        /// back.
-
        results: Option<chan::Sender<FetchResult>>,
-
    },
+
    Fetch,
}

#[derive(Debug, Clone)]
@@ -145,12 +139,10 @@ impl Session {
        self.attempts += 1;
    }

-
    pub fn fetch(&mut self, repo: Id, results: chan::Sender<FetchResult>) -> Option<Message> {
+
    pub fn fetch(&mut self, repo: Id) -> Option<Message> {
        if let State::Connected { protocol, .. } = &mut self.state {
            if let Protocol::Gossip = protocol {
-
                *protocol = Protocol::Fetch {
-
                    results: Some(results),
-
                };
+
                *protocol = Protocol::Fetch;
                return Some(Message::Fetch { repo });
            } else {
                log::error!(
modified radicle-node/src/tests/e2e.rs
@@ -18,7 +18,7 @@ use radicle::Storage;
use radicle::{assert_matches, rad};

use crate::node::NodeId;
-
use crate::service::{FetchLookup, FetchResult};
+
use crate::service::FetchLookup;
use crate::storage::git::transport;
use crate::test::logger;
use crate::{client, client::handle::Handle, client::Runtime, service};
@@ -343,16 +343,17 @@ fn test_replication() {
    };
    assert_eq!(seeds, nonempty::NonEmpty::new(bob.id));

-
    let (from, updated) = match results.recv_timeout(Duration::from_secs(6)).unwrap() {
-
        FetchResult::Fetched { from, updated } => (from, updated),
-
        FetchResult::Error { from, error } => {
-
            panic!("Fetch failed from {from}: {error}");
+
    let result = results.recv_timeout(Duration::from_secs(6)).unwrap();
+
    let updated = match result.result {
+
        Ok(updated) => updated,
+
        Err(err) => {
+
            panic!("Fetch failed from {}: {err}", result.remote);
        }
    };
-
    assert_eq!(from, bob.id);
+
    assert_eq!(result.remote, bob.id);
    assert_eq!(updated, vec![]);

-
    log::debug!(target: "test", "Fetch complete with {}", from);
+
    log::debug!(target: "test", "Fetch complete with {}", result.remote);

    let inventory = alice.handle.inventory().unwrap();
    let alice_refs = alice
modified radicle-node/src/worker.rs
@@ -57,15 +57,10 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        } = task;

        let (session, result) = self._process(&fetch, drain, session);
-
        let result = match result {
-
            Ok(updated) => FetchResult::Fetched {
-
                from: fetch.remote,
-
                updated,
-
            },
-
            Err(error) => FetchResult::Error {
-
                from: fetch.remote,
-
                error,
-
            },
+
        let result = FetchResult {
+
            rid: fetch.repo,
+
            remote: fetch.remote,
+
            result,
        };
        log::debug!(target: "worker", "Sending response back to service..");