Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fail fast when repository can't be read
Alexis Sellier committed 3 years ago
commit 5828b917f9bb9f3251284ef7f73036f1c4d28a44
parent bd436d436936c72996d4dc051ace39dd04316940
5 files changed +77 -45
modified radicle-node/src/service.rs
@@ -32,6 +32,7 @@ use crate::prelude::*;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::service::reactor::FetchDirection;
+
use crate::service::session::GossipState;
use crate::service::tracking::Scope;
use crate::storage;
use crate::storage::{Namespaces, ReadStorage};
@@ -538,8 +539,22 @@ where
            session::FetchResult::Ready(fetch) => {
                debug!(target: "service", "Fetch initiated for {rid} with {seed}..");

-
                self.reactor.write(session, fetch);
-
                session.to_requesting(rid);
+
                match self.tracking.namespaces_for(&self.storage, &rid) {
+
                    Ok(ns) => {
+
                        self.reactor.write(session, fetch);
+
                        session.to_requesting(rid, ns);
+
                    }
+
                    Err(err) => {
+
                        error!(target: "service", "Error getting namespaces for {rid}: {err}");
+

+
                        if let Some(resp) = self.fetch_reqs.get(&rid) {
+
                            resp.send(FetchResult::Failed {
+
                                reason: err.to_string(),
+
                            })
+
                            .ok();
+
                        }
+
                    }
+
                };
            }
            session::FetchResult::AlreadyFetching(other) => {
                if other == rid {
@@ -693,8 +708,8 @@ where

        // If the peer disconnected while we were waiting for a [`Message::FetchOk`],
        // return a failure to any potential fetcher.
-
        if let Some(requested) = session.requesting() {
-
            if let Some(resp) = self.fetch_reqs.remove(&requested) {
+
        if let Some((requested, _)) = session.requesting() {
+
            if let Some(resp) = self.fetch_reqs.remove(requested) {
                resp.send(FetchResult::Failed {
                    reason: format!("disconnected: {reason}"),
                })
@@ -1027,7 +1042,7 @@ where
            }
            (
                session::State::Connected {
-
                    protocol: session::Protocol::Gossip { requested },
+
                    protocol: session::Protocol::Gossip { state },
                    ..
                },
                Message::Fetch { rid },
@@ -1038,8 +1053,8 @@ where

                // We got a fetch request right after sending our own. We have to decide on which
                // fetch to run: our own, or the remote's.
-
                if let Some(req) = requested {
-
                    debug!(target: "service", "Received fetch request from {remote} while attempting to fetch {req}..");
+
                if let GossipState::Requesting { rid, .. } = state {
+
                    debug!(target: "service", "Received fetch request from {remote} while attempting to fetch {rid}..");

                    // When fetch requests cross, the inbound peer takes precedence.
                    if peer.link.is_inbound() {
@@ -1047,7 +1062,7 @@ where

                        // Cancel our own fetch request. This doesn't send anything to the remote,
                        // it simply updates the local session's state machine.
-
                        *requested = None;
+
                        *state = GossipState::Idle;

                        // TODO: Queue the fetch request as if we tried to request twice from
                        // the same node.
@@ -1064,11 +1079,9 @@ where
                self.reactor.fetch(peer, rid, FetchDirection::Responder);
            }
            (session::State::Connected { protocol, .. }, Message::FetchOk { rid }) => {
-
                if *protocol
-
                    != (session::Protocol::Gossip {
-
                        requested: Some(rid),
-
                    })
-
                {
+
                let session::Protocol::Gossip {
+
                    state: GossipState::Requesting { rid: requested, namespaces }
+
                } = protocol else {
                    // As long as we disconnect peers who don't respond to our fetch requests within
                    // the alloted time, this shouldn't happen by mistake.
                    error!(
@@ -1076,21 +1089,19 @@ where
                        peer.id
                    );
                    return Err(session::Error::Misbehavior);
+
                };
+

+
                if *requested != rid {
+
                    error!(
+
                        "Received `fetch-ok` from {} for incorrect repository {rid}",
+
                        peer.id
+
                    );
+
                    return Err(session::Error::Misbehavior);
                }
+
                let namespaces = namespaces.clone();
+

                debug!(target: "service", "Fetch accepted for {rid} from {remote}..");

-
                let namespaces = match self.tracking.namespaces_for(&self.storage, &rid) {
-
                    Ok(ns) => ns,
-
                    Err(err) => {
-
                        if let Some(resp) = self.fetch_reqs.get(&rid) {
-
                            resp.send(FetchResult::Failed {
-
                                reason: err.to_string(),
-
                            })
-
                            .ok();
-
                        }
-
                        return Ok(());
-
                    }
-
                };
                // Instruct the transport to handover the socket to the worker.
                self.reactor
                    .fetch(peer, rid, FetchDirection::Initiator { namespaces });
modified radicle-node/src/service/session.rs
@@ -1,5 +1,7 @@
use std::fmt;

+
use radicle::storage::Namespaces;
+

use crate::service::message;
use crate::service::message::Message;
use crate::service::storage;
@@ -17,11 +19,21 @@ pub enum PingState {
    Ok,
}

+
/// Sub-state of the gossip protocol.
+
#[derive(Debug, Default, PartialEq, Eq, Clone)]
+
pub enum GossipState {
+
    /// Regular gossip, no pending fetch requests.
+
    #[default]
+
    Idle,
+
    /// Requesting a fetch for the given RID. Waiting for a [`Message::FetchOk`].
+
    Requesting { rid: Id, namespaces: Namespaces },
+
}
+

/// Session protocol.
-
#[derive(Debug, Copy, PartialEq, Eq, Clone)]
+
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum Protocol {
    /// The default message-based gossip protocol.
-
    Gossip { requested: Option<Id> },
+
    Gossip { state: GossipState },
    /// Git smart protocol. Used for fetching repository data.
    /// This protocol is used after a connection upgrade via the
    /// [`Message::Fetch`] message.
@@ -30,7 +42,9 @@ pub enum Protocol {

impl Default for Protocol {
    fn default() -> Self {
-
        Self::Gossip { requested: None }
+
        Self::Gossip {
+
            state: GossipState::default(),
+
        }
    }
}

@@ -70,12 +84,13 @@ impl fmt::Display for State {
            }
            Self::Connected { protocol, .. } => match protocol {
                Protocol::Gossip {
-
                    requested: None, ..
+
                    state: GossipState::Idle,
+
                    ..
                } => {
                    write!(f, "connected <gossip>")
                }
                Protocol::Gossip {
-
                    requested: Some(rid),
+
                    state: GossipState::Requesting { rid, .. },
                    ..
                } => {
                    write!(f, "connected <gossip> requested={rid}")
@@ -228,7 +243,9 @@ impl Session {
        matches!(
            self.state,
            State::Connected {
-
                protocol: Protocol::Gossip { requested: Some(_) },
+
                protocol: Protocol::Gossip {
+
                    state: GossipState::Requesting { .. }
+
                },
                ..
            }
        )
@@ -238,12 +255,12 @@ impl Session {
        self.attempts
    }

-
    pub fn fetch(&mut self, rid: Id) -> FetchResult {
-
        if let State::Connected { protocol, .. } = &mut self.state {
+
    pub fn fetch(&self, rid: Id) -> FetchResult {
+
        if let State::Connected { protocol, .. } = &self.state {
            match protocol {
-
                Protocol::Gossip { requested } => {
-
                    if let Some(requested) = requested {
-
                        FetchResult::AlreadyFetching(*requested)
+
                Protocol::Gossip { state } => {
+
                    if let GossipState::Requesting { rid, .. } = state {
+
                        FetchResult::AlreadyFetching(*rid)
                    } else {
                        FetchResult::Ready(Message::Fetch { rid })
                    }
@@ -255,12 +272,12 @@ impl Session {
        }
    }

-
    pub fn to_requesting(&mut self, rid: Id) {
+
    pub fn to_requesting(&mut self, rid: Id, namespaces: Namespaces) {
        let State::Connected { protocol, .. } = &mut self.state else {
            panic!("Session::to_requesting: cannot transition to 'requesting': session is not connected");
        };
        *protocol = Protocol::Gossip {
-
            requested: Some(rid),
+
            state: GossipState::Requesting { rid, namespaces },
        };
    }

@@ -322,13 +339,16 @@ impl Session {
        self.state = State::Initial;
    }

-
    pub fn requesting(&self) -> Option<Id> {
+
    pub fn requesting(&self) -> Option<(&Id, &Namespaces)> {
        if let State::Connected {
-
            protocol: Protocol::Gossip { requested },
+
            protocol:
+
                Protocol::Gossip {
+
                    state: GossipState::Requesting { rid, namespaces },
+
                },
            ..
-
        } = self.state
+
        } = &self.state
        {
-
            requested
+
            Some((rid, namespaces))
        } else {
            None
        }
modified radicle-node/src/service/tracking.rs
@@ -31,7 +31,7 @@ pub enum NamespacesError {
        #[source]
        err: Error,
    },
-
    #[error("Failed to get delegate nodes for {rid}")]
+
    #[error("Failed to get delegates for {rid}")]
    FailedDelegates {
        rid: Id,
        #[source]
modified radicle/src/storage.rs
@@ -29,7 +29,7 @@ pub type BranchName = git::RefString;
pub type Inventory = Vec<Id>;

/// Describes one or more namespaces.
-
#[derive(Default, Debug, Clone)]
+
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub enum Namespaces {
    /// All namespaces.
    #[default]
modified radicle/src/test/storage.rs
@@ -1,4 +1,5 @@
use std::collections::HashMap;
+
use std::io;
use std::path::{Path, PathBuf};

use git_ref_format as fmt;
@@ -59,7 +60,7 @@ impl ReadStorage for MockStorage {
        let doc = self
            .inventory
            .get(&rid)
-
            .expect("Mockstorage::repository: missing doc");
+
            .ok_or_else(|| Error::Io(io::Error::from(io::ErrorKind::NotFound)))?;
        Ok(MockRepository {
            id: rid,
            doc: doc.clone(),