Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: improve fetch
Fintan Halpenny committed 3 years ago
commit e937cdd79acca6178e54e365c491f2d9de2b0268
parent 3b015a9325cd77da23ecca0de5e102680fe1fbe9
6 files changed +83 -19
modified radicle-node/src/runtime.rs
@@ -156,6 +156,7 @@ impl<G: Signer + Ecdh + 'static> Runtime<G> {
        }

        let pool = worker::Pool::with(
+
            id,
            worker_recv,
            handle.clone(),
            worker::Config {
modified radicle-node/src/service.rs
@@ -1072,6 +1072,7 @@ where
                        return Ok(());
                    }
                }
+

                // Accept the request and instruct the transport to handover the socket to the worker.
                self.reactor.write(peer, Message::FetchOk { rid });
                self.reactor.fetch(peer, rid, FetchDirection::Responder);
@@ -1092,14 +1093,16 @@ where
                }
                debug!(target: "service", "Fetch accepted for {rid} from {remote}..");

+
                let namespaces = if let Ok(_repo) = self.storage.repository(rid) {
+
                    // FIXME(finto): using remotes breaks test_gossip_during_fetch, so we use default for now
+
                    Namespaces::default()
+
                } else {
+
                    Namespaces::default()
+
                };
+

                // Instruct the transport to handover the socket to the worker.
-
                self.reactor.fetch(
-
                    peer,
-
                    rid,
-
                    FetchDirection::Initiator {
-
                        namespaces: Namespaces::default(),
-
                    },
-
                );
+
                self.reactor
+
                    .fetch(peer, rid, FetchDirection::Initiator { namespaces });
            }
            (session::State::Connecting { .. }, msg) => {
                error!(target: "service", "Received {:?} from connecting peer {}", msg, peer.id);
@@ -1198,6 +1201,21 @@ where
                // SAFETY: `REF_REMOTE_LIMIT` is greater than 1, thus the bounded vec can hold at least
                // one remote.
                .unwrap(),
+
            Namespaces::Many(pks) => {
+
                for remote_id in pks.into_iter() {
+
                    if refs
+
                        .push((*remote_id, repo.remote(&remote_id)?.refs.unverified()))
+
                        .is_err()
+
                    {
+
                        warn!(
+
                            target: "service",
+
                            "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
+
                            REF_REMOTE_LIMIT,
+
                        );
+
                        break;
+
                    }
+
                }
+
            }
        }

        let msg = AnnouncementMessage::from(RefsAnnouncement {
modified radicle-node/src/service/session.rs
@@ -108,6 +108,8 @@ pub enum Error {
    Timeout,
    #[error("handshake error")]
    Handshake(String),
+
    #[error("failed to inspect remotes for fetch: {0}")]
+
    Remotes(#[from] storage::refs::Error),
}

/// A peer session. Each connected peer will have one session.
modified radicle-node/src/test/simulator.rs
@@ -679,6 +679,7 @@ fn fetch<W: WriteRepository>(
    let namespace = match namespaces.into() {
        Namespaces::All => None,
        Namespaces::One(ns) => Some(ns),
+
        Namespaces::Many(ns) => Some(ns.head),
    };
    let mut updates = Vec::new();
    let mut callbacks = git::RemoteCallbacks::new();
modified radicle-node/src/worker.rs
@@ -7,7 +7,7 @@ use cyphernet::Ecdh;
use netservices::tunnel::Tunnel;
use netservices::{NetSession, SplitIo};

-
use radicle::crypto::Signer;
+
use radicle::crypto::{PublicKey, Signer};
use radicle::identity::{Id, IdentityError};
use radicle::storage::{Namespaces, ReadRepository, RefUpdate, WriteRepository, WriteStorage};
use radicle::{git, Storage};
@@ -37,6 +37,8 @@ pub struct Config {
/// Error returned by fetch.
#[derive(thiserror::Error, Debug)]
pub enum FetchError {
+
    #[error("the 'git fetch' command failed with exit code '{code}'")]
+
    CommandFailed { code: i32 },
    #[error(transparent)]
    Git(#[from] git::raw::Error),
    #[error(transparent)]
@@ -73,6 +75,7 @@ pub struct TaskResult<G: Signer + Ecdh> {

/// A worker that replicates git objects.
struct Worker<G: Signer + Ecdh> {
+
    local: PublicKey,
    storage: Storage,
    tasks: chan::Receiver<Task<G>>,
    daemon: net::SocketAddr,
@@ -197,15 +200,22 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                // a state we can't roll back.
                cmd.arg("--prune");
            }
+
            Namespaces::Many(_) => {
+
                // Same case as All
+
            }
        }

        if self.atomic {
            // Enable atomic fetch. Only works with Git 2.31 and later.
            cmd.arg("--atomic");
        }
+

+
        // Ignore our own remote when fetching
+
        let mut fetchspecs = namespaces.as_fetchspecs();
+
        fetchspecs.push(format!("^refs/namespaces/{}/*", self.local));
+

        cmd.arg(format!("git://{tunnel_addr}/{}", repo.id.canonical()))
-
            // FIXME: We need to omit our own namespace from this refspec in case we're fetching '*'.
-
            .arg(namespaces.as_fetchspec())
+
            .args(&fetchspecs)
            .stdout(process::Stdio::piped())
            .stderr(process::Stdio::piped())
            .stdin(process::Stdio::piped());
@@ -224,16 +234,18 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
        let _ = tunnel.tunnel_once(popol::Poller::new(), self.timeout)?;

        // TODO: Parse fetch output to return updates.
-
        if child.wait()?.success() {
+
        let result = child.wait()?;
+
        if result.success() {
            log::debug!(target: "worker", "Fetch for {} exited successfully", rid);
+
            let head = repo.set_head()?;
+
            log::debug!(target: "worker", "Head for {} set to {head}", rid);
+
            Ok(vec![])
        } else {
            log::error!(target: "worker", "Fetch for {} failed", rid);
+
            Err(FetchError::CommandFailed {
+
                code: result.code().unwrap_or(1),
+
            })
        }
-
        let head = repo.set_head()?;
-

-
        log::debug!(target: "worker", "Head for {} set to {head}", rid);
-

-
        Ok(vec![])
    }

    fn upload_pack(
@@ -314,6 +326,7 @@ pub struct Pool {
impl Pool {
    /// Create a new worker pool with the given parameters.
    pub fn with<G: Signer + Ecdh + 'static>(
+
        local: PublicKey,
        tasks: chan::Receiver<Task<G>>,
        handle: Handle<G>,
        config: Config,
@@ -321,6 +334,7 @@ impl Pool {
        let mut pool = Vec::with_capacity(config.capacity);
        for _ in 0..config.capacity {
            let worker = Worker {
+
                local,
                tasks: tasks.clone(),
                handle: handle.clone(),
                storage: config.storage.clone(),
modified radicle/src/storage.rs
@@ -36,13 +36,35 @@ pub enum Namespaces {
    All,
    /// A single namespace, by public key.
    One(PublicKey),
+
    /// Many namespaces, by public keys.
+
    Many(NonEmpty<PublicKey>),
}

impl Namespaces {
-
    pub fn as_fetchspec(&self) -> String {
+
    pub fn remotes<R>(repo: &R) -> Result<Option<Self>, refs::Error>
+
    where
+
        R: ReadRepository,
+
    {
+
        Ok(NonEmpty::collect(repo.remotes()?.keys().copied()).map(Self::Many))
+
    }
+

+
    pub fn delegates<R>(repo: &R) -> Result<Self, IdentityError>
+
    where
+
        R: ReadRepository,
+
    {
+
        Ok(Self::Many(repo.delegates()?.map(PublicKey::from)))
+
    }
+

+
    pub fn as_fetchspecs(&self) -> Vec<String> {
        match self {
-
            Self::All => String::from("refs/namespaces/*:refs/namespaces/*"),
-
            Self::One(pk) => format!("refs/namespaces/{pk}/refs/*:refs/namespaces/{pk}/refs/*"),
+
            Self::All => vec![String::from("refs/namespaces/*:refs/namespaces/*")],
+
            Self::One(pk) => vec![format!(
+
                "refs/namespaces/{pk}/refs/*:refs/namespaces/{pk}/refs/*"
+
            )],
+
            Self::Many(pks) => pks
+
                .iter()
+
                .map(|pk| format!("refs/namespaces/{pk}/refs/*:refs/namespaces/{pk}/refs/*"))
+
                .collect(),
        }
    }
}
@@ -53,6 +75,12 @@ impl From<PublicKey> for Namespaces {
    }
}

+
impl From<NonEmpty<PublicKey>> for Namespaces {
+
    fn from(pks: NonEmpty<PublicKey>) -> Self {
+
        Self::Many(pks)
+
    }
+
}
+

/// Storage error.
#[derive(Error, Debug)]
pub enum Error {