Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fully integrate worker fetch into service
Alexis Sellier committed 3 years ago
commit ef950d515a086615f37d1bf14a95481ec332fa26
parent ec2e7d7ee167776ca8d50b873b0da95ee6a68d28
13 files changed +176 -181
modified radicle-node/src/bounded.rs
@@ -7,7 +7,7 @@ pub enum Error {
}

/// A vector with an upper limit on its size using type level constants.
-
#[derive(Debug, Default, Clone, PartialEq, Eq)]
+
#[derive(Default, Clone, PartialEq, Eq)]
pub struct BoundedVec<T, const N: usize> {
    v: Vec<T>,
}
@@ -186,3 +186,9 @@ impl<T, const N: usize> TryFrom<Vec<T>> for BoundedVec<T, N> {
        Ok(BoundedVec { v: value })
    }
}
+

+
impl<T: std::fmt::Debug, const N: usize> std::fmt::Debug for BoundedVec<T, N> {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        self.v.fmt(f)
+
    }
+
}
modified radicle-node/src/service.rs
@@ -35,12 +35,13 @@ use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::service::session::Protocol;
use crate::storage;
-
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, WriteStorage};
+
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteStorage};
use crate::Link;

pub use crate::node::NodeId;
pub use crate::service::config::{Config, Network};
pub use crate::service::message::{Message, ZeroBytes};
+
pub use crate::service::reactor::Fetch;
pub use crate::service::session::Session;

use self::gossip::Gossip;
@@ -79,8 +80,8 @@ pub use message::REF_LIMIT;
#[derive(Debug, Clone)]
pub enum Event {
    RefsFetched {
-
        from: NodeId,
-
        project: Id,
+
        remote: NodeId,
+
        rid: Id,
        updated: Vec<RefUpdate>,
    },
}
@@ -134,6 +135,7 @@ pub enum FetchLookup {
pub struct FetchResult {
    pub rid: Id,
    pub remote: NodeId,
+
    pub namespaces: Namespaces,
    pub result: Result<Vec<RefUpdate>, FetchError>,
}

@@ -473,9 +475,7 @@ where

                // TODO: Limit the number of seeds we fetch from? Randomize?
                for seed in seeds {
-
                    if let Err(err) = self.fetch(id, seed) {
-
                        log::error!("Error initiating fetch for {id} from {seed}: {err}");
-
                    }
+
                    self.fetch(id, &seed);
                }
            }
            Command::TrackRepo(id, resp) => {
@@ -515,33 +515,51 @@ where
        }
    }

-
    pub fn fetch(&mut self, rid: Id, seed: NodeId) -> Result<(), Error> {
-
        let session = self.sessions.get_mut(&seed).unwrap();
+
    pub fn fetch(&mut self, rid: Id, seed: &NodeId) {
+
        let Some(session) = self.sessions.get_mut(seed) else {
+
            panic!("Service::fetch: attempted to fetch from unknown peer {seed}");
+
        };
+

        if let Some(fetch) = session.fetch(rid) {
+
            debug!("Fetch initiated for {rid} with {seed}..");
+

            self.reactor.write(session.id, fetch);
-
            self.reactor
-
                .fetch(session.id, rid, Namespaces::default(), true);
        } else {
            // TODO: If we can't fetch, it's because we're already fetching from
            // this peer. So we need to queue the request, or find another peer.
-
            todo!();
+
            log::error!(
+
                "Unable to fetch {rid} from peer {seed} that is already being fetched from"
+
            );
        }
-
        Ok(())
    }

-
    pub fn repo_fetched(&mut self, result: FetchResult) {
+
    pub fn fetched(&mut self, result: FetchResult) {
        let remote = result.remote;
        let rid = result.rid;

+
        match &result.result {
+
            Ok(updated) => {
+
                self.reactor.event(Event::RefsFetched {
+
                    remote,
+
                    rid,
+
                    updated: updated.clone(),
+
                });
+
            }
+
            Err(err) => {
+
                error!("Fetch failed for {rid} from {remote}: {err}");
+
            }
+
        }
+

        if let Some(results) = self.fetch_reqs.get(&rid) {
            if results.send(result).is_err() {
                self.fetch_reqs.remove(&rid);
            }
        }
+

        if let Some(session) = self.sessions.get_mut(&remote) {
            if let session::State::Connected { protocol, .. } = &mut session.state {
                if *protocol == session::Protocol::Fetch {
-
                    *protocol = session::Protocol::Gossip;
+
                    *protocol = session::Protocol::default();
                } else {
                    panic!(
                        "Unexpected session state for {}: expected 'fetch' protocol, got 'gossip'",
@@ -734,32 +752,9 @@ where
                    // Refs are only supposed to be relayed by peers who are tracking
                    // the resource. Therefore, it's safe to fetch from the remote
                    // peer, even though it isn't the announcer.
-
                    let updated = match self
-
                        .storage
-
                        .repository(message.id)
-
                        .map_err(storage::FetchError::from)
-
                        .and_then(|mut r| r.fetch(relayer, Namespaces::default()))
-
                    {
-
                        Ok(updated) => updated,
-
                        Err(err) => {
-
                            error!(
-
                                "Error fetching repository {} from {}: {}",
-
                                message.id, relayer, err
-
                            );
-
                            return Ok(false);
-
                        }
-
                    };
-
                    let is_updated = !updated.is_empty();
+
                    self.fetch(message.id, relayer);

-
                    self.reactor.event(Event::RefsFetched {
-
                        from: *relayer,
-
                        project: message.id,
-
                        updated,
-
                    });
-

-
                    if is_updated {
-
                        return Ok(relay);
-
                    }
+
                    return Ok(true);
                } else {
                    log::debug!(
                        "Ignoring refs announcement from {announcer}: repository {} isn't tracked",
@@ -844,14 +839,14 @@ where
        match (&mut peer.state, message) {
            (
                session::State::Connected {
-
                    protocol: session::Protocol::Fetch { .. },
+
                    protocol: session::Protocol::Fetch,
                    ..
                },
                _,
            ) => {
                // This should never happen if the service is properly configured, since all
                // incoming data is sent directly to the Git worker.
-
                log::error!("Received gossip message from {remote} during Git fetch");
+
                log::error!("Received gossip message from {remote} during git fetch");

                return Err(session::Error::Misbehavior);
            }
@@ -933,11 +928,37 @@ where
                    }
                }
            }
-
            (session::State::Connected { protocol, .. }, Message::Fetch { repo }) => {
+
            (session::State::Connected { protocol, .. }, Message::Fetch { rid }) => {
+
                debug!("Fetch requested for {rid} from {remote}..");
+

+
                // TODO: Check that we have the repo first?
+

+
                *protocol = Protocol::Fetch;
+
                // Accept the request and instruct the transport to handover the socket to the worker.
+
                self.reactor.write(*remote, Message::FetchOk { rid });
+
                self.reactor
+
                    .fetch(*remote, rid, Namespaces::default(), false);
+
            }
+
            (session::State::Connected { protocol, .. }, Message::FetchOk { rid }) => {
+
                if *protocol
+
                    != (session::Protocol::Gossip {
+
                        requested: Some(rid),
+
                    })
+
                {
+
                    // As long as we disconnect peers who don't respond to our fetch requests within
+
                    // the alloted time, this shouldn't happen by mistake.
+
                    error!(
+
                        "Received unexpected message `fetch-ok` from peer {}",
+
                        peer.id
+
                    );
+
                    return Err(session::Error::Misbehavior);
+
                }
+
                debug!("Fetch accepted for {rid} from {remote}..");
+

                *protocol = Protocol::Fetch;
                // Instruct the transport to handover the socket to the worker.
                self.reactor
-
                    .fetch(*remote, repo, Namespaces::default(), false);
+
                    .fetch(*remote, rid, Namespaces::default(), true);
            }
            (session::State::Connecting { .. }, msg) => {
                error!("Received {:?} from connecting peer {}", msg, peer.id);
modified radicle-node/src/service/message.rs
@@ -304,8 +304,11 @@ pub enum Message {
        zeroes: ZeroBytes,
    },

-
    /// Upgrade session to Git protocol and fetch the given repository.
-
    Fetch { repo: Id },
+
    /// Request a session upgrade to the Git protocol and fetch the given repository.
+
    Fetch { rid: Id },
+

+
    /// Accept a fetch request.
+
    FetchOk { rid: Id },
}

impl Message {
@@ -391,7 +394,8 @@ impl fmt::Debug for Message {
            }
            Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {:?})", zeroes),
            Self::Pong { zeroes } => write!(f, "Pong({:?})", zeroes),
-
            Self::Fetch { repo } => write!(f, "Fetch({repo})"),
+
            Self::Fetch { rid } => write!(f, "Fetch({rid})"),
+
            Self::FetchOk { rid } => write!(f, "FetchOk({rid})"),
        }
    }
}
modified radicle-node/src/service/reactor.rs
@@ -4,7 +4,7 @@ use log::*;

use crate::prelude::*;
use crate::service::session::Session;
-
use crate::storage::{FetchError, Namespaces, RefUpdate};
+
use crate::storage::Namespaces;

use super::message::{Announcement, AnnouncementMessage};

@@ -38,16 +38,6 @@ pub struct Fetch {
    pub initiated: bool,
}

-
/// Result of a fetch request from a specific seed.
-
#[derive(Debug)]
-
#[allow(clippy::large_enum_variant)]
-
pub enum FetchResult {
-
    /// Successful fetch from a seed.
-
    Fetched { updated: Vec<RefUpdate> },
-
    /// Error fetching the resource from a seed.
-
    Error { from: NodeId, error: FetchError },
-
}
-

/// Interface to the network reactor.
#[derive(Debug, Default)]
pub struct Reactor {
@@ -97,11 +87,6 @@ impl Reactor {
    }

    pub fn fetch(&mut self, remote: NodeId, repo: Id, namespaces: Namespaces, initiated: bool) {
-
        if initiated {
-
            debug!("Fetch initiated for {} with {}..", repo, remote);
-
        } else {
-
            debug!("Fetch requested for {} from {}..", repo, remote);
-
        }
        self.io.push_back(Io::Fetch(Fetch {
            repo,
            namespaces,
modified radicle-node/src/service/session.rs
@@ -16,17 +16,22 @@ pub enum PingState {
}

/// Session protocol.
-
#[derive(Debug, Default, Copy, PartialEq, Eq, Clone)]
+
#[derive(Debug, Copy, PartialEq, Eq, Clone)]
pub enum Protocol {
    /// The default message-based gossip protocol.
-
    #[default]
-
    Gossip,
+
    Gossip { requested: Option<Id> },
    /// Git smart protocol. Used for fetching repository data.
    /// This protocol is used after a connection upgrade via the
    /// [`Message::Fetch`] message.
    Fetch,
}

+
impl Default for Protocol {
+
    fn default() -> Self {
+
        Self::Gossip { requested: None }
+
    }
+
}
+

#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum State {
@@ -139,16 +144,15 @@ impl Session {
        self.attempts += 1;
    }

-
    pub fn fetch(&mut self, repo: Id) -> Option<Message> {
+
    pub fn fetch(&mut self, rid: Id) -> Option<Message> {
        if let State::Connected { protocol, .. } = &mut self.state {
-
            if let Protocol::Gossip = protocol {
-
                *protocol = Protocol::Fetch;
-
                return Some(Message::Fetch { repo });
+
            if *protocol == (Protocol::Gossip { requested: None }) {
+
                *protocol = Protocol::Gossip {
+
                    requested: Some(rid),
+
                };
+
                return Some(Message::Fetch { rid });
            } else {
-
                log::error!(
-
                    "Attempted to upgrade protocol for {} which was already upgraded",
-
                    self.id
-
                );
+
                log::error!("Attempted to fetch from peer {} which isn't ready", self.id);
            }
        }
        None
modified radicle-node/src/test/simulator.rs
@@ -16,7 +16,8 @@ use crate::crypto::Signer;
use crate::prelude::Address;
use crate::service::reactor::Io;
use crate::service::{DisconnectReason, Event, Message, NodeId};
-
use crate::storage::WriteStorage;
+
use crate::service::{FetchError, FetchResult};
+
use crate::storage::{WriteRepository, WriteStorage};
use crate::test::peer::Service;
use crate::Link;

@@ -59,6 +60,8 @@ pub enum Input {
    Disconnected(NodeId, Rc<DisconnectReason>),
    /// Received a message from a remote peer.
    Received(NodeId, Vec<Message>),
+
    /// Fetch completed for a node.
+
    Fetched(Arc<FetchResult>),
    /// Used to advance the state machine after some wall time has passed.
    Wake,
}
@@ -103,6 +106,13 @@ impl fmt::Display for Scheduled {
            Input::Wake => {
                write!(f, "{}: Tock", self.node)
            }
+
            Input::Fetched(result) => {
+
                write!(
+
                    f,
+
                    "{} <~ {} ({}): FetchCompleted",
+
                    self.node, result.remote, result.rid
+
                )
+
            }
        }
    }
}
@@ -399,6 +409,14 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            p.received_message(id, msg);
                        }
                    }
+
                    Input::Fetched(result) => {
+
                        let result = Arc::try_unwrap(result).unwrap();
+
                        let mut repo = p.storage().repository(result.rid).unwrap();
+

+
                        repo.fetch(&result.remote, result.namespaces.clone())
+
                            .unwrap();
+
                        p.fetched(result);
+
                    }
                }
                for o in p.by_ref() {
                    self.schedule(&node, o);
@@ -593,7 +611,37 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    events.push_back(event);
                }
            }
-
            Io::Fetch(..) => todo!("I have no idea what to do here"),
+
            Io::Fetch(fetch) => {
+
                if self.is_fallible() {
+
                    self.inbox.insert(
+
                        self.time + LocalDuration::from_secs(3),
+
                        Scheduled {
+
                            node,
+
                            remote: fetch.remote,
+
                            input: Input::Fetched(Arc::new(FetchResult {
+
                                rid: fetch.repo,
+
                                remote: fetch.remote,
+
                                namespaces: fetch.namespaces,
+
                                result: Err(FetchError::Io(io::ErrorKind::Other.into())),
+
                            })),
+
                        },
+
                    );
+
                } else {
+
                    self.inbox.insert(
+
                        self.time + LocalDuration::from_secs(3),
+
                        Scheduled {
+
                            node,
+
                            remote: fetch.remote,
+
                            input: Input::Fetched(Arc::new(FetchResult {
+
                                rid: fetch.repo,
+
                                remote: fetch.remote,
+
                                namespaces: fetch.namespaces,
+
                                result: Ok(vec![]),
+
                            })),
+
                        },
+
                    );
+
                }
+
            }
        }
    }

modified radicle-node/src/tests.rs
@@ -868,6 +868,8 @@ fn test_push_and_pull() {
    alice.command(service::Command::AnnounceRefs(proj_id));
    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

+
    // TODO: Refs should be compared between the two peers.
+

    assert!(eve
        .storage()
        .get(&alice.node_id(), proj_id)
@@ -880,8 +882,8 @@ fn test_push_and_pull() {
        .is_some());
    assert_matches!(
        sim.events(&bob.id).next(),
-
        Some(service::Event::RefsFetched { from, .. })
-
        if from == eve.node_id(),
+
        Some(service::Event::RefsFetched { remote, .. })
+
        if remote == eve.node_id(),
        "Bob fetched from Eve"
    );
}
modified radicle-node/src/tests/e2e.rs
@@ -355,6 +355,8 @@ fn test_replication() {

    log::debug!(target: "test", "Fetch complete with {}", result.remote);

+
    // TODO: Have simpler way of listing all refs.
+

    let inventory = alice.handle.inventory().unwrap();
    let alice_refs = alice
        .storage
modified radicle-node/src/wire/message.rs
@@ -21,6 +21,7 @@ pub enum MessageType {
    Ping = 10,
    Pong = 12,
    Fetch = 14,
+
    FetchOk = 16,
}

impl From<MessageType> for u16 {
@@ -42,6 +43,7 @@ impl TryFrom<u16> for MessageType {
            10 => Ok(MessageType::Ping),
            12 => Ok(MessageType::Pong),
            14 => Ok(MessageType::Fetch),
+
            16 => Ok(MessageType::FetchOk),
            _ => Err(other),
        }
    }
@@ -64,6 +66,7 @@ impl Message {
            Self::Ping { .. } => MessageType::Ping,
            Self::Pong { .. } => MessageType::Pong,
            Self::Fetch { .. } => MessageType::Fetch,
+
            Self::FetchOk { .. } => MessageType::FetchOk,
        }
        .into()
    }
@@ -217,8 +220,11 @@ impl wire::Encode for Message {
            Self::Pong { zeroes } => {
                n += zeroes.encode(writer)?;
            }
-
            Self::Fetch { repo } => {
-
                n += repo.encode(writer)?;
+
            Self::Fetch { rid } => {
+
                n += rid.encode(writer)?;
+
            }
+
            Self::FetchOk { rid } => {
+
                n += rid.encode(writer)?;
            }
        }

@@ -295,8 +301,12 @@ impl wire::Decode for Message {
                Ok(Self::Pong { zeroes })
            }
            Ok(MessageType::Fetch) => {
-
                let repo = Id::decode(reader)?;
-
                Ok(Self::Fetch { repo })
+
                let rid = Id::decode(reader)?;
+
                Ok(Self::Fetch { rid })
+
            }
+
            Ok(MessageType::FetchOk) => {
+
                let rid = Id::decode(reader)?;
+
                Ok(Self::FetchOk { rid })
            }
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
        }
modified radicle-node/src/wire/protocol.rs
@@ -342,7 +342,7 @@ where
        peer.downgrade();

        self.actions.push_back(Action::RegisterTransport(session));
-
        self.service.repo_fetched(resp.result);
+
        self.service.fetched(resp.result);
    }
}

modified radicle-node/src/worker.rs
@@ -60,6 +60,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        let result = FetchResult {
            rid: fetch.repo,
            remote: fetch.remote,
+
            namespaces: fetch.namespaces,
            result,
        };
        log::debug!(target: "worker", "Sending response back to service..");
@@ -125,6 +126,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
            .arg("--atomic")
            .arg("--verbose")
            .arg(format!("git://{tunnel_addr}/{}", repo.id))
+
            // FIXME: We need to omit our own namespace from this refspec in case we're fetching '*'.
            .arg(fetch.namespaces.as_fetchspec())
            .stdout(process::Stdio::piped())
            .stderr(process::Stdio::piped())
modified radicle/src/storage/git.rs
@@ -631,13 +631,19 @@ impl WriteRepository for Repository {
            opts.remote_callbacks(callbacks);

            let refspec = if let Some(namespace) = namespace {
+
                // TODO: Make sure we verify before pruning, as pruning may get us into
+
                // a state we can't roll back.
+
                opts.prune(git2::FetchPrune::On);
+

                format!("refs/namespaces/{namespace}/refs/*:refs/namespaces/{namespace}/refs/*")
            } else {
+
                // We should not prune in this case, because it would mean that namespaces that
+
                // don't exit on the remote would be deleted locally.
+
                opts.prune(git2::FetchPrune::Off);
+

+
                // FIXME: We need to omit our own namespace from this refspec.
                "refs/namespaces/*:refs/namespaces/*".to_owned()
            };
-
            // TODO: Make sure we verify before pruning, as pruning may get us into
-
            // a state we can't roll back.
-
            opts.prune(git2::FetchPrune::On);
            // Fetch from the staging copy into the canonical repo.
            remote.fetch(&[refspec], Some(&mut opts), None)?;
        }
modified radicle/src/storage/git/transport/remote.rs
@@ -1,100 +1,5 @@
//! Git sub-transport used for fetching radicle data.
-
//!
-
//! To have control over the communication, and to allow git streams to be multiplexed over
-
//! existing TCP connections, we implement the [`git2::transport::SmartSubtransport`] trait.
-
//!
-
//! We choose `heartwood` as the URL scheme for this custom transport, and include the node we'd
-
//! like to fetch from, as well as the repository. We expect the TCP stream to already be
-
//! established when this transport is called.
-
//!
-
//! We then maintain a map from node identifier to stream, for all active TCP connections. When a
-
//! URL is requested, we lookup the associated stream and return it to the [`git2`] smart-protocol
-
//! implementation, so that it can carry out the git smart protocol.
-
//!
-
//! This module is meant to be used by registering streams with [`register`].
pub mod mock;
pub mod url;

-
use std::collections::HashMap;
-
use std::str::FromStr;
-
use std::sync::Mutex;
-
use std::sync::Once;
-

-
use git2::transport::SmartSubtransportStream;
-
use once_cell::sync::Lazy;
-

-
use crate::storage::RemoteId;
-

pub use url::{Url, UrlError};
-

-
/// The map of git smart sub-transport streams. We keep a global map because we have
-
/// no control over how [`git2::transport::register`] instantiates our [`Smart`] transport
-
/// or its underlying streams.
-
static STREAMS: Lazy<Mutex<HashMap<RemoteId, Stream>>> = Lazy::new(Default::default);
-

-
/// The stream associated with a repository.
-
type Stream = Box<dyn SmartSubtransportStream>;
-

-
/// Git transport protocol over an I/O stream.
-
#[derive(Clone)]
-
struct Smart;
-

-
impl git2::transport::SmartSubtransport for Smart {
-
    /// Run a git service on this transport.
-
    ///
-
    /// Based on the URL, which must be a valid [`Url`],
-
    /// we retrieve an underlying stream and return it.
-
    ///
-
    /// We only support the upload-pack service, since only fetches are authorized by the
-
    /// remote.
-
    fn action(
-
        &self,
-
        url: &str,
-
        action: git2::transport::Service,
-
    ) -> Result<Box<dyn git2::transport::SmartSubtransportStream>, git2::Error> {
-
        let url = Url::from_str(url).map_err(|e| git2::Error::from_str(e.to_string().as_str()))?;
-
        let mut streams = STREAMS.lock().expect("lock isn't poisoned");
-

-
        if let Some(stream) = streams.remove(&url.node) {
-
            match action {
-
                git2::transport::Service::UploadPackLs | git2::transport::Service::UploadPack => {}
-
                git2::transport::Service::ReceivePack | git2::transport::Service::ReceivePackLs => {
-
                    return Err(git2::Error::from_str(
-
                        "git-receive-pack is not supported with the custom transport",
-
                    ));
-
                }
-
            }
-
            Ok(stream)
-
        } else {
-
            Err(git2::Error::from_str(&format!(
-
                "node {} does not have an associated stream",
-
                url.node
-
            )))
-
        }
-
    }
-

-
    fn close(&self) -> Result<(), git2::Error> {
-
        Ok(())
-
    }
-
}
-

-
/// Register the radicle transport with `git`.
-
///
-
/// This function can be called more than once. Only one transport will be registered.
-
///
-
pub fn register(node: RemoteId, stream: impl SmartSubtransportStream) {
-
    static REGISTER: Once = Once::new();
-

-
    // Registration is not thread-safe, so make sure we prevent re-entrancy.
-
    REGISTER.call_once(|| unsafe {
-
        git2::transport::register(Url::SCHEME, move |remote| {
-
            git2::transport::Transport::smart(remote, false, Smart)
-
        })
-
        .expect("remote transport registration");
-
    });
-

-
    STREAMS
-
        .lock()
-
        .expect("lock isn't poisoned")
-
        .insert(node, Box::new(stream));
-
}