Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: use enum for fetch direction
Fintan Halpenny committed 3 years ago
commit e408b0d62cfed949b9fb1506a3d2056bd6d56735
parent c3a5aced07d230f3ec909533a9e6d68b88a1246c
6 files changed +197 -143
modified radicle-node/src/service.rs
@@ -32,6 +32,7 @@ use crate::node::{Address, Features, FetchResult, Seed, Seeds};
use crate::prelude::*;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
+
use crate::service::reactor::FetchDirection;
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteStorage};
use crate::storage::{Namespaces, ReadStorage};
@@ -524,7 +525,7 @@ where
                resp.send(untracked).ok();
            }
            Command::AnnounceRefs(id) => {
-
                if let Err(err) = self.announce_refs(id, Namespaces::One(self.node_id())) {
+
                if let Err(err) = self.announce_refs(id, &Namespaces::One(self.node_id())) {
                    error!("Error announcing refs: {}", err);
                }
            }
@@ -586,92 +587,73 @@ where
    pub fn fetched(&mut self, fetch: Fetch, result: Result<Vec<RefUpdate>, FetchError>) {
        let remote = fetch.remote;
        let rid = fetch.rid;
-
        let initiated = fetch.initiated;

-
        if initiated {
-
            let result = match result {
-
                Ok(updated) => {
-
                    log::debug!(target: "service", "Fetched {rid} from {remote}");
-

-
                    self.reactor.event(Event::RefsFetched {
-
                        remote,
-
                        rid,
-
                        updated: updated.clone(),
-
                    });
-
                    FetchResult::Success { updated }
-
                }
-
                Err(err) => {
-
                    let reason = err.to_string();
-
                    error!(target: "service", "Fetch failed for {rid} from {remote}: {reason}");
-

-
                    // For now, we only disconnect the remote in case of timeout. In the future,
-
                    // there may be other reasons to disconnect.
-
                    if err.is_timeout() {
-
                        self.reactor
-
                            .disconnect(remote, DisconnectReason::Fetch(err));
+
        match fetch.direction {
+
            FetchDirection::Initiator { namespaces } => {
+
                let result = match result {
+
                    Ok(updated) => {
+
                        log::debug!(target: "service", "Fetched {rid} from {remote}");
+

+
                        self.reactor.event(Event::RefsFetched {
+
                            remote,
+
                            rid,
+
                            updated: updated.clone(),
+
                        });
+
                        FetchResult::Success { updated }
                    }
-
                    FetchResult::Failed { reason }
-
                }
-
            };
+
                    Err(err) => {
+
                        let reason = err.to_string();
+
                        error!(target: "service", "Fetch failed for {rid} from {remote}: {reason}");
+

+
                        // For now, we only disconnect the remote in case of timeout. In the future,
+
                        // there may be other reasons to disconnect.
+
                        if err.is_timeout() {
+
                            self.reactor
+
                                .disconnect(remote, DisconnectReason::Fetch(err));
+
                        }
+
                        FetchResult::Failed { reason }
+
                    }
+
                };

-
            if let Some(results) = self.fetch_reqs.get(&rid) {
-
                log::debug!(target: "service", "Found existing fetch request, sending result..");
+
                if let Some(results) = self.fetch_reqs.get(&rid) {
+
                    log::debug!(target: "service", "Found existing fetch request, sending result..");

-
                if results.send(result).is_err() {
-
                    log::error!(target: "service", "Error sending fetch result for {rid}..");
-
                    // FIXME: We should remove the channel even on success, once all seeds
-
                    // were fetched from. Otherwise an organic fetch will try to send on the
-
                    // channel.
-
                    self.fetch_reqs.remove(&rid);
+
                    if results.send(result).is_err() {
+
                        log::error!(target: "service", "Error sending fetch result for {rid}..");
+
                        // FIXME: We should remove the channel even on success, once all seeds
+
                        // were fetched from. Otherwise an organic fetch will try to send on the
+
                        // channel.
+
                        self.fetch_reqs.remove(&rid);
+
                    } else {
+
                        log::debug!(target: "service", "Sent fetch result for {rid}..");
+
                    }
                } else {
-
                    log::debug!(target: "service", "Sent fetch result for {rid}..");
-
                }
-
            } else {
-
                log::debug!(target: "service", "No fetch requests found for {rid}..");
-

-
                // We only announce refs here when the fetch wasn't user-requested. This is
-
                // because the user might want to announce his fork, once he has created one,
-
                // or may choose to not announce anything.
-
                if let Err(e) = self.announce_refs(rid, fetch.namespaces) {
-
                    error!(target: "service", "Failed to announce new refs: {e}");
-
                }
-
            }
-
        }
-

-
        if let Some(session) = self.sessions.get_mut(&remote) {
-
            // Transition session back to gossip protocol.
-
            session.to_gossip();
-
            // Drain any messages in the session's outbox, which might have accumulated during the
-
            // fetch, and send them to the peer.
-
            self.reactor.drain(session);
-
        } else {
-
            log::debug!(target: "service", "Session not found for {remote}");
-
        }
+
                    log::debug!(target: "service", "No fetch requests found for {rid}..");

-
        // Nb. This block needs to be run after we've switched back to the gossip protocol,
-
        // otherwise the messages will be queued.
-
        if initiated {
-
            // TODO: Since this fetch could be either a full clone or simply a ref update, we need
-
            // to either announce new inventory, or new refs. Right now, we announce both in some
-
            // cases.
-

-
            // Announce the newly fetched repository to the network, if necessary.
-
            match self.sync_inventory() {
-
                Ok(updated) => {
-
                    if !updated.is_empty() {
-
                        if let Err(e) = self
-
                            .storage
-
                            .inventory()
-
                            .and_then(|i| self.announce_inventory(i))
-
                        {
-
                            error!(target: "service", "Failed to announce inventory: {e}");
-
                        }
+
                    // We only announce refs here when the fetch wasn't user-requested. This is
+
                    // because the user might want to announce his fork, once he has created one,
+
                    // or may choose to not announce anything.
+
                    if let Err(e) = self.announce_refs(rid, &namespaces) {
+
                        error!(target: "service", "Failed to announce new refs: {e}");
                    }
                }
-
                Err(e) => {
-
                    error!(target: "service", "Failed to sync inventory: {e}");
-
                }
+

+
                self.switch_to_gossip(remote);
+

+
                // TODO: Since this fetch could be either a full clone
+
                // or simply a ref update, we need to either announce
+
                // new inventory, or new refs. Right now, we announce
+
                // both in some cases.
+
                //
+
                // Announce the newly fetched repository to the
+
                // network, if necessary.
+
                //
+
                // Nb. This needs to be run after we've switched back
+
                // to the gossip protocol, otherwise the messages will
+
                // be queued.
+
                self.sync_and_announce();
            }
+
            FetchDirection::Responder => self.switch_to_gossip(remote),
        }
    }

@@ -1092,7 +1074,7 @@ where
                }
                // Accept the request and instruct the transport to handover the socket to the worker.
                self.reactor.write(peer, Message::FetchOk { rid });
-
                self.reactor.fetch(peer, rid, Namespaces::default(), false);
+
                self.reactor.fetch(peer, rid, FetchDirection::Responder);
            }
            (session::State::Connected { protocol, .. }, Message::FetchOk { rid }) => {
                if *protocol
@@ -1111,7 +1093,13 @@ where
                debug!(target: "service", "Fetch accepted for {rid} from {remote}..");

                // Instruct the transport to handover the socket to the worker.
-
                self.reactor.fetch(peer, rid, Namespaces::default(), true);
+
                self.reactor.fetch(
+
                    peer,
+
                    rid,
+
                    FetchDirection::Initiator {
+
                        namespaces: Namespaces::default(),
+
                    },
+
                );
            }
            (session::State::Connecting { .. }, msg) => {
                error!(target: "service", "Received {:?} from connecting peer {}", msg, peer.id);
@@ -1186,7 +1174,7 @@ where
    }

    /// Announce local refs for given id.
-
    fn announce_refs(&mut self, rid: Id, namespaces: Namespaces) -> Result<(), storage::Error> {
+
    fn announce_refs(&mut self, rid: Id, namespaces: &Namespaces) -> Result<(), storage::Error> {
        let repo = self.storage.repository(rid)?;
        let peers = self.sessions.connected().map(|(_, p)| p);
        let timestamp = self.time();
@@ -1206,7 +1194,7 @@ where
                }
            }
            Namespaces::One(pk) => refs
-
                .push((pk, repo.remote(&pk)?.refs.unverified()))
+
                .push((*pk, repo.remote(pk)?.refs.unverified()))
                // SAFETY: `REF_REMOTE_LIMIT` is greater than 1, thus the bounded vec can hold at least
                // one remote.
                .unwrap(),
@@ -1224,6 +1212,38 @@ where
        Ok(())
    }

+
    fn switch_to_gossip(&mut self, remote: NodeId) {
+
        if let Some(session) = self.sessions.get_mut(&remote) {
+
            // Transition session back to gossip protocol.
+
            session.to_gossip();
+
            // Drain any messages in the session's outbox, which might
+
            // have accumulated during a fetch, and send them to the
+
            // peer.
+
            self.reactor.drain(session);
+
        } else {
+
            log::debug!(target: "service", "Session not found for {remote}");
+
        }
+
    }
+

+
    fn sync_and_announce(&mut self) {
+
        match self.sync_inventory() {
+
            Ok(updated) => {
+
                if !updated.is_empty() {
+
                    if let Err(e) = self
+
                        .storage
+
                        .inventory()
+
                        .and_then(|i| self.announce_inventory(i))
+
                    {
+
                        error!(target: "service", "Failed to announce inventory: {e}");
+
                    }
+
                }
+
            }
+
            Err(e) => {
+
                error!(target: "service", "Failed to sync inventory: {e}");
+
            }
+
        }
+
    }
+

    fn connect(&mut self, node: NodeId, addr: Address) -> bool {
        if self.sessions.is_disconnected(&node) {
            self.reactor.connect(node, addr);
modified radicle-node/src/service/reactor.rs
@@ -31,12 +31,42 @@ pub enum Io {
pub struct Fetch {
    /// Repo to fetch.
    pub rid: Id,
-
    /// Namespaces to fetch.
-
    pub namespaces: Namespaces,
+
    /// Indicates whether the fetch request was initiated or is a response.
+
    pub direction: FetchDirection,
    /// Remote peer we are interacting with.
    pub remote: NodeId,
-
    /// Indicates whether the fetch request was initiated by us.
-
    pub initiated: bool,
+
}
+

+
impl Fetch {
+
    pub fn is_initiator(&self) -> bool {
+
        self.direction.is_initiator()
+
    }
+

+
    pub fn initiated(&self) -> Option<&Namespaces> {
+
        match &self.direction {
+
            FetchDirection::Initiator { namespaces } => Some(namespaces),
+
            FetchDirection::Responder => None,
+
        }
+
    }
+
}
+

+
#[derive(Debug, Clone)]
+
pub enum FetchDirection {
+
    /// Client is initiating a fetch in order to receive the specified
+
    /// `refspecs` determined by [`Namespaces`].
+
    Initiator {
+
        /// Namespaces to fetch.
+
        namespaces: Namespaces,
+
    },
+
    /// Server is responding to a fetch request by uploading the
+
    /// specified `refspecs` sent by the client.
+
    Responder,
+
}
+

+
impl FetchDirection {
+
    pub fn is_initiator(&self) -> bool {
+
        matches!(self, Self::Initiator { .. })
+
    }
}

/// Interface to the network reactor.
@@ -123,21 +153,14 @@ impl Reactor {
        self.io.push_back(Io::Wakeup(after));
    }

-
    pub fn fetch(
-
        &mut self,
-
        remote: &mut Session,
-
        rid: Id,
-
        namespaces: Namespaces,
-
        initiated: bool,
-
    ) {
+
    pub fn fetch(&mut self, remote: &mut Session, rid: Id, direction: FetchDirection) {
        // Transition the session state machine to "fetching".
        remote.to_fetching(rid);

        self.io.push_back(Io::Fetch(Fetch {
            rid,
-
            namespaces,
+
            direction,
            remote: remote.id,
-
            initiated,
        }));
    }

modified radicle-node/src/test/simulator.rs
@@ -109,7 +109,10 @@ impl fmt::Display for Scheduled {
                write!(
                    f,
                    "{} <<~ {} ({}): Fetched (initiated={})",
-
                    self.node, fetch.remote, fetch.rid, fetch.initiated
+
                    self.node,
+
                    fetch.remote,
+
                    fetch.rid,
+
                    fetch.is_initiator()
                )
            }
        }
@@ -410,13 +413,13 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    }
                    Input::Fetched(f, result) => {
                        let result = Rc::try_unwrap(result).unwrap();
-
                        if f.initiated {
+
                        if let Some(namespaces) = f.initiated() {
                            let mut repo = match p.storage().repository_mut(f.rid) {
                                Ok(repo) => repo,
                                Err(e) if e.is_not_found() => p.storage().create(f.rid).unwrap(),
                                Err(e) => panic!("Failed to open repository: {e}"),
                            };
-
                            fetch(&mut repo, &f.remote, f.namespaces.clone()).unwrap();
+
                            fetch(&mut repo, &f.remote, namespaces.clone()).unwrap();
                        }
                        p.fetched(f, result);
                    }
@@ -614,7 +617,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
            Io::Fetch(fetch) => {
                let remote = fetch.remote;

-
                if fetch.initiated {
+
                if fetch.is_initiator() {
                    log::info!(
                        target: "sim",
                        "{:05} {} ~> {} ({}): Fetch outgoing",
modified radicle-node/src/tests.rs
@@ -16,6 +16,7 @@ use crate::prelude::{LocalDuration, Timestamp};
use crate::service::config::*;
use crate::service::filter::Filter;
use crate::service::message::*;
+
use crate::service::reactor::FetchDirection;
use crate::service::reactor::Io;
use crate::service::ServiceState as _;
use crate::service::*;
@@ -646,9 +647,10 @@ fn test_gossip_during_fetch() {
    alice.fetched(
        Fetch {
            rid,
-
            namespaces: Namespaces::All,
+
            direction: FetchDirection::Initiator {
+
                namespaces: Namespaces::All,
+
            },
            remote: bob.id,
-
            initiated: true,
        },
        Ok(vec![]),
    );
modified radicle-node/src/wire/protocol.rs
@@ -110,7 +110,7 @@ impl std::fmt::Debug for Peer {
            } => write!(
                f,
                "Upgrading(initiated={}, {link:?}, {id})",
-
                fetch.initiated
+
                fetch.is_initiator(),
            ),
            Self::Upgraded { link, id, .. } => write!(f, "Upgraded({link:?}, {id})"),
        }
modified radicle-node/src/worker.rs
@@ -14,7 +14,7 @@ use radicle::{git, Storage};
use reactor::poller::popol;

use crate::runtime::Handle;
-
use crate::service::reactor::Fetch;
+
use crate::service::reactor::{Fetch, FetchDirection};
use crate::storage;
use crate::wire::{WireReader, WireSession, WireWriter};

@@ -121,54 +121,60 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
        drain: Vec<u8>,
        mut session: WireSession<G>,
    ) -> (WireSession<G>, Result<Vec<RefUpdate>, FetchError>) {
-
        if fetch.initiated {
-
            log::debug!(target: "worker", "Worker processing outgoing fetch for {}", fetch.rid);
-

-
            let mut tunnel = match Tunnel::with(session, net::SocketAddr::from(([0, 0, 0, 0], 0))) {
-
                Ok(tunnel) => tunnel,
-
                Err((session, err)) => return (session, Err(err.into())),
-
            };
-
            let result = self.fetch(fetch, &mut tunnel);
-
            let mut session = tunnel.into_session();
-

-
            // If there are no errors, send a `done` special packet. We don't send this on error,
-
            // as the remote will not be expecting it.
-
            if let Err(err) = &result {
-
                log::error!(target: "worker", "Fetch error: {err}");
-
            } else if let Err(err) = pktline::done(&mut session) {
-
                log::error!(target: "worker", "Fetch error: {err}");
+
        let rid = fetch.rid;
+
        match &fetch.direction {
+
            FetchDirection::Initiator { namespaces } => {
+
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", fetch.rid);
+

+
                let mut tunnel =
+
                    match Tunnel::with(session, net::SocketAddr::from(([0, 0, 0, 0], 0))) {
+
                        Ok(tunnel) => tunnel,
+
                        Err((session, err)) => return (session, Err(err.into())),
+
                    };
+
                let result = self.fetch(rid, namespaces, &mut tunnel);
+
                let mut session = tunnel.into_session();
+

+
                // If there are no errors, send a `done` special packet. We don't send this on error,
+
                // as the remote will not be expecting it.
+
                if let Err(err) = &result {
+
                    log::error!(target: "worker", "Fetch error: {err}");
+
                } else if let Err(err) = pktline::done(&mut session) {
+
                    log::error!(target: "worker", "Fetch error: {err}");
+
                }
+
                (session, result)
            }
-
            (session, result)
-
        } else {
-
            log::debug!(target: "worker", "Worker processing incoming fetch for {}", fetch.rid);
+
            FetchDirection::Responder => {
+
                log::debug!(target: "worker", "Worker processing incoming fetch for {}", fetch.rid);

-
            if let Err(err) = session.as_connection_mut().set_nonblocking(false) {
-
                return (session, Err(err.into()));
-
            }
-
            let (mut stream_r, mut stream_w) = match session.split_io() {
-
                Ok((r, w)) => (r, w),
-
                Err(err) => {
-
                    return (err.original, Err(err.error.into()));
+
                if let Err(err) = session.as_connection_mut().set_nonblocking(false) {
+
                    return (session, Err(err.into()));
                }
-
            };
-
            let result = self.upload_pack(fetch, drain, &mut stream_r, &mut stream_w);
-
            let session = WireSession::from_split_io(stream_r, stream_w);
+
                let (mut stream_r, mut stream_w) = match session.split_io() {
+
                    Ok((r, w)) => (r, w),
+
                    Err(err) => {
+
                        return (err.original, Err(err.error.into()));
+
                    }
+
                };
+
                let result = self.upload_pack(fetch, drain, &mut stream_r, &mut stream_w);
+
                let session = WireSession::from_split_io(stream_r, stream_w);

-
            if let Err(err) = &result {
-
                log::error!(target: "worker", "Upload-pack error: {err}");
+
                if let Err(err) = &result {
+
                    log::error!(target: "worker", "Upload-pack error: {err}");
+
                }
+
                (session, result)
            }
-
            (session, result)
        }
    }

    fn fetch(
        &self,
-
        fetch: &Fetch,
+
        rid: Id,
+
        namespaces: &Namespaces,
        tunnel: &mut Tunnel<WireSession<G>>,
    ) -> Result<Vec<RefUpdate>, FetchError> {
-
        let repo = match self.storage.repository_mut(fetch.rid) {
+
        let repo = match self.storage.repository_mut(rid) {
            Ok(r) => Ok(r),
-
            Err(e) if e.is_not_found() => self.storage.create(fetch.rid),
+
            Err(e) if e.is_not_found() => self.storage.create(rid),
            Err(e) => Err(e),
        }?;
        let tunnel_addr = tunnel.local_addr()?;
@@ -181,7 +187,7 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
            .arg("fetch")
            .arg("--verbose");

-
        match fetch.namespaces {
+
        match namespaces {
            Namespaces::All => {
                // We should not prune in this case, because it would mean that namespaces that
                // don't exit on the remote would be deleted locally.
@@ -199,7 +205,7 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
        }
        cmd.arg(format!("git://{tunnel_addr}/{}", repo.id.canonical()))
            // FIXME: We need to omit our own namespace from this refspec in case we're fetching '*'.
-
            .arg(fetch.namespaces.as_fetchspec())
+
            .arg(namespaces.as_fetchspec())
            .stdout(process::Stdio::piped())
            .stderr(process::Stdio::piped())
            .stdin(process::Stdio::piped());
@@ -219,13 +225,13 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {

        // TODO: Parse fetch output to return updates.
        if child.wait()?.success() {
-
            log::debug!(target: "worker", "Fetch for {} exited successfully", fetch.rid);
+
            log::debug!(target: "worker", "Fetch for {} exited successfully", rid);
        } else {
-
            log::error!(target: "worker", "Fetch for {} failed", fetch.rid);
+
            log::error!(target: "worker", "Fetch for {} failed", rid);
        }
        let head = repo.set_head()?;

-
        log::debug!(target: "worker", "Head for {} set to {head}", fetch.rid);
+
        log::debug!(target: "worker", "Head for {} set to {head}", rid);

        Ok(vec![])
    }