Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Improve error handling in a few places
Alexis Sellier committed 3 years ago
commit 832c6ad53f8ec6c3cda6965e49c8176a72d25a4f
parent 78545c2f0f60c07e8b7315213366468964f3dec4
6 files changed +99 -44
modified radicle-node/src/service.rs
@@ -67,6 +67,17 @@ pub enum Event {
    },
}

+
/// General service error.
+
#[derive(thiserror::Error, Debug)]
+
pub enum Error {
+
    #[error(transparent)]
+
    Storage(#[from] storage::Error),
+
    #[error(transparent)]
+
    Fetch(#[from] storage::FetchError),
+
    #[error(transparent)]
+
    Routing(#[from] routing::Error),
+
}
+

/// Error returned by [`Command::Fetch`].
#[derive(thiserror::Error, Debug)]
pub enum FetchError {
@@ -314,11 +325,12 @@ where
        &mut self.reactor
    }

-
    pub fn lookup(&self, id: Id) -> Result<Lookup, routing::Error> {
+
    /// Lookup a project, both locally and in the routing table.
+
    pub fn lookup(&self, id: Id) -> Result<Lookup, LookupError> {
        let remote = self.routing.get(&id)?.iter().cloned().collect();

        Ok(Lookup {
-
            local: self.storage.get(&self.node_id(), id).unwrap(),
+
            local: self.storage.get(&self.node_id(), id)?,
            remote,
        })
    }
@@ -362,7 +374,9 @@ where
        }
        if now - self.last_announce >= ANNOUNCE_INTERVAL {
            if self.out_of_sync {
-
                self.announce_inventory().unwrap();
+
                if let Err(err) = self.announce_inventory() {
+
                    error!("Error announcing inventory: {}", err);
+
                }
            }
            self.reactor.wakeup(ANNOUNCE_INTERVAL);
            self.last_announce = now;
@@ -370,7 +384,9 @@ where
        if now - self.last_prune >= PRUNE_INTERVAL {
            debug!("Running 'prune' task...");

-
            self.prune_routing_entries();
+
            if let Err(err) = self.prune_routing_entries() {
+
                error!("Error pruning routing entries: {}", err);
+
            }
            self.reactor.wakeup(PRUNE_INTERVAL);
            self.last_prune = now;
        }
@@ -451,20 +467,9 @@ where
                resp.send(self.untrack(id)).ok();
            }
            Command::AnnounceRefs(id) => {
-
                let node = self.node_id();
-
                let repo = self.storage.repository(id).unwrap();
-
                let remote = repo.remote(&node).unwrap();
-
                let peers = self.sessions.negotiated().map(|(_, p)| p);
-
                let refs = remote.refs.into();
-
                let timestamp = self.clock.timestamp();
-
                let msg = AnnouncementMessage::from(RefsAnnouncement {
-
                    id,
-
                    refs,
-
                    timestamp,
-
                });
-
                let ann = msg.signed(&self.signer);
-

-
                self.reactor.broadcast(ann, peers);
+
                if let Err(err) = self.announce_refs(id) {
+
                    error!("Error announcing refs: {}", err);
+
                }
            }
            Command::QueryState(query, sender) => {
                sender.send(query(self)).ok();
@@ -524,7 +529,7 @@ where
    pub fn disconnected(
        &mut self,
        addr: &std::net::SocketAddr,
-
        reason: nakamoto::DisconnectReason<DisconnectReason>,
+
        reason: &nakamoto::DisconnectReason<DisconnectReason>,
    ) {
        let since = self.local_time();
        let address = Address::from(*addr);
@@ -593,6 +598,7 @@ where
    /// and `false` if it should not.
    pub fn handle_announcement(
        &mut self,
+
        session: &NodeId,
        git: &git::Url,
        announcement: &Announcement,
    ) -> Result<bool, peer::SessionError> {
@@ -600,17 +606,18 @@ where
            return Err(SessionError::Misbehavior);
        }
        let Announcement { node, message, .. } = announcement;
+
        let now = self.clock.local_time();
+

+
        // Don't allow messages from too far in the future.
+
        if message.timestamp().saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
+
            return Err(SessionError::InvalidTimestamp(message.timestamp()));
+
        }

        match message {
            AnnouncementMessage::Inventory(message) => {
-
                let now = self.clock.local_time();
                let peer = self.peers.entry(*node).or_insert_with(Peer::default);
                let relay = self.config.relay;

-
                // Don't allow messages from too far in the future.
-
                if message.timestamp.saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
-
                    return Err(SessionError::InvalidTimestamp(message.timestamp));
-
                }
                // Discard inventory messages we've already seen, otherwise update
                // out last seen time.
                if message.timestamp > peer.last_message {
@@ -618,8 +625,20 @@ where
                } else {
                    return Ok(false);
                }
-
                self.process_inventory(&message.inventory, *node, git)
-
                    .unwrap();
+

+
                if let Err(err) = self.process_inventory(&message.inventory, *node, git) {
+
                    error!("Error processing inventory from {}: {}", node, err);
+

+
                    if let Error::Fetch(storage::FetchError::Verify(err)) = err {
+
                        // Disconnect the peer if it is the signer of this message.
+
                        if node == session {
+
                            return Err(peer::SessionError::VerificationFailed(err));
+
                        }
+
                    }
+
                    // There's not much we can do if the peer sending us this message isn't the
+
                    // origin of it.
+
                    return Ok(false);
+
                }

                if relay {
                    return Ok(true);
@@ -632,6 +651,8 @@ where
                // TODO: Check that we're tracking this user as well.
                if self.config.is_tracking(&message.id) {
                    // TODO: Check refs to see if we should try to fetch or not.
+
                    // FIXME: This code is wrong: we shouldn't be fetching from the connected peer,
+
                    // we should fetch from the origin.
                    let updated = self.storage.fetch(message.id, git).unwrap();
                    let is_updated = !updated.is_empty();

@@ -715,11 +736,12 @@ where
                return Err(SessionError::Misbehavior);
            }
            // Process a peer announcement.
-
            (SessionState::Negotiated { git, .. }, Message::Announcement(ann)) => {
+
            (SessionState::Negotiated { id, git, .. }, Message::Announcement(ann)) => {
                let git = git.clone();
+
                let id = id.clone();

                // Returning true here means that the message should be relayed.
-
                if self.handle_announcement(&git, &ann)? {
+
                if self.handle_announcement(&id, &git, &ann)? {
                    self.gossip.received(ann.clone(), ann.message.timestamp());
                    return Ok(Some(ann));
                }
@@ -753,7 +775,7 @@ where
        inventory: &Inventory,
        from: NodeId,
        remote: &Url,
-
    ) -> Result<(), routing::Error> {
+
    ) -> Result<(), Error> {
        for proj_id in inventory {
            // TODO: Fire an event on routing update.
            if self
@@ -761,12 +783,32 @@ where
                .insert(*proj_id, from, self.clock.timestamp())?
                && self.config.is_tracking(proj_id)
            {
-
                self.storage.fetch(*proj_id, remote).unwrap();
+
                self.storage.fetch(*proj_id, remote)?;
            }
        }
        Ok(())
    }

+
    /// Announce local refs for given id.
+
    fn announce_refs(&mut self, id: Id) -> Result<(), storage::Error> {
+
        let node = self.node_id();
+
        let repo = self.storage.repository(id)?;
+
        let remote = repo.remote(&node)?;
+
        let peers = self.sessions.negotiated().map(|(_, p)| p);
+
        let refs = remote.refs.into();
+
        let timestamp = self.clock.timestamp();
+
        let msg = AnnouncementMessage::from(RefsAnnouncement {
+
            id,
+
            refs,
+
            timestamp,
+
        });
+
        let ann = msg.signed(&self.signer);
+

+
        self.reactor.broadcast(ann, peers);
+

+
        Ok(())
+
    }
+

    ////////////////////////////////////////////////////////////////////////////
    // Periodic tasks
    ////////////////////////////////////////////////////////////////////////////
@@ -785,8 +827,9 @@ where
        Ok(())
    }

-
    fn prune_routing_entries(&mut self) {
+
    fn prune_routing_entries(&mut self) -> Result<(), storage::Error> {
        // TODO
+
        Ok(())
    }

    fn maintain_connections(&mut self) {
@@ -848,7 +891,7 @@ where
    }
}

-
#[derive(Debug, Clone)]
+
#[derive(Debug)]
pub enum DisconnectReason {
    User,
    Error(SessionError),
@@ -895,6 +938,14 @@ pub struct Lookup {
    pub remote: Vec<NodeId>,
}

+
#[derive(thiserror::Error, Debug)]
+
pub enum LookupError {
+
    #[error(transparent)]
+
    Storage(#[from] storage::Error),
+
    #[error(transparent)]
+
    Routing(#[from] routing::Error),
+
}
+

/// Information on a peer, that we may or may not be connected to.
#[derive(Default, Debug)]
pub struct Peer {
modified radicle-node/src/service/peer.rs
@@ -23,7 +23,7 @@ pub enum SessionState {
    Disconnected { since: LocalTime },
}

-
#[derive(thiserror::Error, Debug, Clone)]
+
#[derive(thiserror::Error, Debug)]
pub enum SessionError {
    #[error("wrong network constant in message: {0}")]
    WrongMagic(u32),
@@ -33,6 +33,8 @@ pub enum SessionError {
    InvalidTimestamp(u64),
    #[error("session not found for address `{0}`")]
    NotFound(net::IpAddr),
+
    #[error("verification failed on fetch: {0}")]
+
    VerificationFailed(#[from] storage::VerifyError),
    #[error("peer misbehaved")]
    Misbehavior,
}
modified radicle-node/src/test/simulator.rs
@@ -7,6 +7,7 @@ pub mod arbitrary;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut, Range};
+
use std::rc::Rc;
use std::{fmt, io, net};

use log::*;
@@ -56,7 +57,7 @@ pub enum Input {
    /// Disconnected from peer.
    Disconnected(
        net::SocketAddr,
-
        nakamoto::DisconnectReason<DisconnectReason>,
+
        Rc<nakamoto::DisconnectReason<DisconnectReason>>,
    ),
    /// Received a message from a remote peer.
    Received(net::SocketAddr, Vec<Envelope>),
@@ -393,7 +394,7 @@ impl<S: WriteStorage + 'static> Simulation<S> {
                        assert!(!(attempt && connection));

                        if attempt || connection {
-
                            p.disconnected(&addr, reason);
+
                            p.disconnected(&addr, &reason);
                        }
                    }
                    Input::Wake => p.wake(),
@@ -501,9 +502,9 @@ impl<S: WriteStorage + 'static> Simulation<S> {
                                remote,
                                input: Input::Disconnected(
                                    remote,
-
                                    nakamoto::DisconnectReason::ConnectionError(
+
                                    Rc::new(nakamoto::DisconnectReason::ConnectionError(
                                        io::Error::from(io::ErrorKind::UnexpectedEof).into(),
-
                                    ),
+
                                    )),
                                ),
                            },
                        );
@@ -543,7 +544,7 @@ impl<S: WriteStorage + 'static> Simulation<S> {
                self.priority.push_back(Scheduled {
                    remote,
                    node,
-
                    input: Input::Disconnected(remote, reason.into()),
+
                    input: Input::Disconnected(remote, Rc::new(reason.into())),
                });

                // Nb. It's possible for disconnects to happen simultaneously from both ends, hence
@@ -569,9 +570,9 @@ impl<S: WriteStorage + 'static> Simulation<S> {
                        remote: local_addr,
                        input: Input::Disconnected(
                            local_addr,
-
                            nakamoto::DisconnectReason::ConnectionError(
+
                            Rc::new(nakamoto::DisconnectReason::ConnectionError(
                                io::Error::from(io::ErrorKind::ConnectionReset).into(),
-
                            ),
+
                            )),
                        ),
                    },
                );
modified radicle-node/src/test/tests.rs
@@ -420,14 +420,14 @@ fn test_persistent_peer_reconnect() {
    // a reconnection.
    alice.disconnected(
        &eve.addr(),
-
        nakamoto::DisconnectReason::DialError(error.clone()),
+
        &nakamoto::DisconnectReason::DialError(error.clone()),
    );
    assert_matches!(alice.outbox().next(), None);

    for _ in 0..MAX_CONNECTION_ATTEMPTS {
        alice.disconnected(
            &bob.addr(),
-
            nakamoto::DisconnectReason::ConnectionError(error.clone()),
+
            &nakamoto::DisconnectReason::ConnectionError(error.clone()),
        );
        assert_matches!(alice.outbox().next(), Some(Io::Connect(a)) if a == bob.addr());
        assert_matches!(alice.outbox().next(), None);
@@ -438,7 +438,7 @@ fn test_persistent_peer_reconnect() {
    // After the max connection attempts, a disconnect doesn't trigger a reconnect.
    alice.disconnected(
        &bob.addr(),
-
        nakamoto::DisconnectReason::ConnectionError(error),
+
        &nakamoto::DisconnectReason::ConnectionError(error),
    );
    assert_matches!(alice.outbox().next(), None);
}
modified radicle-node/src/wire.rs
@@ -491,7 +491,7 @@ where
        reason: nakamoto::DisconnectReason<service::DisconnectReason>,
    ) {
        self.inboxes.remove(&addr.ip());
-
        self.inner.disconnected(addr, reason)
+
        self.inner.disconnected(addr, &reason)
    }

    pub fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
modified radicle/src/storage.rs
@@ -9,6 +9,7 @@ use std::{fmt, io};

use thiserror::Error;

+
pub use git::VerifyError;
pub use radicle_git_ext::Oid;

use crate::collections::HashMap;