Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-node src worker.rs
#![allow(clippy::too_many_arguments)]
mod channels;
mod upload_pack;

pub mod fetch;
pub mod garbage;

use std::path::PathBuf;

use crossbeam_channel as chan;

use radicle::identity::RepoId;
use radicle::node::notifications;
use radicle::node::policy::config as policy;
use radicle::node::policy::config::SeedingPolicy;
use radicle::prelude::NodeId;
use radicle::storage::refs::RefsAt;
use radicle::storage::{ReadRepository, ReadStorage};
use radicle::{Storage, cob, crypto};

pub use radicle_protocol::worker::{
    AuthorizationError, FetchError, FetchRequest, FetchResult, UploadError,
};

use crate::runtime::{Handle, thread};
use crate::wire::StreamId;

pub use channels::{ChannelEvent, Channels, ChannelsConfig};

/// Worker pool configuration.
pub struct Config {
    /// Number of worker threads.
    pub capacity: usize,
    /// Git storage.
    pub storage: Storage,
    /// Configuration for performing fetched.
    pub fetch: FetchConfig,
    /// Default policy, if a policy for a specific node or repository was not found.
    pub policy: SeedingPolicy,
    /// Path to the policies database.
    pub policies_db: PathBuf,
}

/// Task to be accomplished on a worker thread.
/// This is either going to be an outgoing or incoming fetch.
pub struct Task {
    pub fetch: FetchRequest,
    pub stream: StreamId,
    pub channels: Channels,
}

/// Worker response.
#[derive(Debug)]
pub struct TaskResult {
    pub remote: NodeId,
    pub result: FetchResult,
    pub stream: StreamId,
}

#[derive(Debug, Clone)]
pub struct FetchConfig {
    /// Public key of the local peer.
    pub local: crypto::PublicKey,
    /// Configuration for `git gc` garbage collection. Defaults to `1
    /// hour ago`.
    pub expiry: garbage::Expiry,
}

/// A worker that replicates git objects.
struct Worker {
    nid: NodeId,
    storage: Storage,
    fetch_config: FetchConfig,
    tasks: chan::Receiver<Task>,
    handle: Handle,
    policies: policy::Config<policy::store::Read>,
    notifications: notifications::StoreWriter,
    cache: cob::cache::StoreWriter,
    db: radicle::node::Database,
}

impl Worker {
    /// Waits for tasks and runs them. Blocks indefinitely unless there is an error receiving
    /// the next task.
    fn run(mut self) -> Result<(), chan::RecvError> {
        loop {
            let task = self.tasks.recv()?;
            self.process(task);
        }
    }

    fn process(
        &mut self,
        Task {
            fetch,
            channels,
            stream,
        }: Task,
    ) {
        let remote = fetch.remote();
        let channels = channels::ChannelsFlush::new(self.handle.clone(), channels, remote, stream);
        let result = self._process(fetch, stream, channels, self.notifications.clone());

        log::trace!(target: "worker", "Sending response back to service..");

        if self
            .handle
            .worker_result(TaskResult {
                remote,
                stream,
                result,
            })
            .is_err()
        {
            log::error!(target: "worker", "Unable to report fetch result: worker channel disconnected");
        }
    }

    fn _process(
        &mut self,
        fetch: FetchRequest,
        stream: StreamId,
        mut channels: channels::ChannelsFlush,
        notifs: notifications::StoreWriter,
    ) -> FetchResult {
        match fetch {
            FetchRequest::Initiator {
                rid,
                remote,
                refs_at,
                config,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
                let result = self.fetch(rid, remote, refs_at, config, channels, notifs);
                FetchResult::Initiator { rid, result }
            }
            FetchRequest::Responder { remote, emitter } => {
                log::debug!(target: "worker", "Worker processing incoming fetch for {remote} on stream {stream}..");

                let timeout = channels.timeout();
                let (mut stream_r, stream_w) = channels.split();

                let mut iter = gix_packetline::blocking_io::StreamingPeekableIter::new(
                    &mut stream_r,
                    &[gix_packetline::PacketLineRef::Flush],
                    false, /* packet tracing */
                );

                let header = match iter.read_line() {
                    None => {
                        return FetchResult::Responder {
                            rid: None,
                            result: Err(UploadError::PacketLine(std::io::Error::new(
                                std::io::ErrorKind::UnexpectedEof,
                                "unexpected end of stream while reading upload-pack header",
                            ))),
                        };
                    }
                    Some(Err(e)) => {
                        return FetchResult::Responder {
                            rid: None,
                            result: Err(UploadError::PacketLine(e)),
                        };
                    }
                    Some(Ok(Err(e))) => {
                        return FetchResult::Responder {
                            rid: None,
                            result: Err(UploadError::PacketLine(std::io::Error::new(
                                std::io::ErrorKind::InvalidData,
                                format!("invalid upload-pack header: {e}"),
                            ))),
                        };
                    }
                    Some(Ok(Ok(header))) => header,
                };

                let Some(header) = upload_pack::GitRequest::from_packetline(header) else {
                    return FetchResult::Responder {
                        rid: None,
                        result: Err(UploadError::PacketLine(std::io::Error::new(
                            std::io::ErrorKind::InvalidData,
                            "failed to parse upload-pack header",
                        ))),
                    };
                };

                log::debug!(target: "worker", "Spawning upload-pack process for {} on stream {stream}..", header.repo);

                if let Err(e) = self.is_authorized(remote, header.repo) {
                    return FetchResult::Responder {
                        rid: Some(header.repo),
                        result: Err(e.into()),
                    };
                }

                let result = upload_pack::upload_pack(
                    &self.nid,
                    remote,
                    &self.storage,
                    &emitter,
                    &header,
                    stream_r,
                    stream_w,
                    timeout,
                )
                .map(drop)
                .map_err(UploadError::UploadPack);
                log::debug!(target: "worker", "Upload process on stream {stream} exited with result {result:?}");

                FetchResult::Responder {
                    rid: Some(header.repo),
                    result,
                }
            }
        }
    }

    fn is_authorized(&self, remote: NodeId, rid: RepoId) -> Result<(), AuthorizationError> {
        let policy = self.policies.seed_policy(&rid)?.policy;
        // Check policy first, since if we're blocking then we likely don't have
        // the repository.
        if policy.is_block() {
            return Err(AuthorizationError::Unauthorized(remote, rid));
        }
        let repo = self.storage.repository(rid)?;
        let doc = repo.identity_doc()?;

        if !doc.is_visible_to(&remote.into()) {
            Err(AuthorizationError::Unauthorized(remote, rid))
        } else {
            Ok(())
        }
    }

    fn fetch(
        &mut self,
        rid: RepoId,
        remote: NodeId,
        refs_at: Option<Vec<RefsAt>>,
        fetch_config: radicle_fetch::Config,
        channels: channels::ChannelsFlush,
        notifs: notifications::StoreWriter,
    ) -> Result<fetch::FetchResult, FetchError> {
        let FetchConfig { local, expiry } = &self.fetch_config;
        // N.b. if the `rid` is blocked this will return an error, so
        // we won't continue with any further set up of the fetch.
        let allowed = radicle_fetch::Allowed::from_config(rid, &self.policies)?;
        let blocked = radicle_fetch::BlockList::from_config(&self.policies)?;

        let mut cache = self.cache.clone();
        let handle = fetch::Handle::new(
            rid,
            *local,
            &self.storage,
            allowed,
            blocked,
            channels,
            notifs,
        )?;
        let result = handle.fetch(
            rid,
            &self.storage,
            &mut cache,
            &mut self.db,
            fetch_config,
            remote,
            refs_at,
        )?;

        if let Err(e) = garbage::collect(&self.storage, rid, *expiry) {
            // N.b. ensure that `git gc` works in debug mode.
            debug_assert!(false, "`git gc` failed: {e}");

            log::debug!(target: "worker", "Failed to run `git gc`: {e}");
        }
        Ok(result)
    }
}

/// A pool of workers. One thread is allocated for each worker.
pub struct Pool {
    pool: Vec<thread::JoinHandle<Result<(), chan::RecvError>>>,
}

impl Pool {
    /// Create a new worker pool with the given parameters.
    pub fn with(
        tasks: chan::Receiver<Task>,
        nid: NodeId,
        handle: Handle,
        notifications: notifications::StoreWriter,
        cache: cob::cache::StoreWriter,
        db: radicle::node::Database,
        config: Config,
    ) -> Result<Self, policy::Error> {
        let mut pool = Vec::with_capacity(config.capacity);
        for i in 0..config.capacity {
            let policies =
                policy::Config::new(config.policy, policy::Store::reader(&config.policies_db)?);
            let worker = Worker {
                nid,
                tasks: tasks.clone(),
                handle: handle.clone(),
                storage: config.storage.clone(),
                fetch_config: config.fetch.clone(),
                policies,
                notifications: notifications.clone(),
                cache: cache.clone(),
                db: db.clone(),
            };
            let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());

            pool.push(thread);
        }
        Ok(Self { pool })
    }

    /// Run the worker pool.
    ///
    /// Blocks until all worker threads have exited.
    pub fn run(self) -> thread::Result<()> {
        for (i, worker) in self.pool.into_iter().enumerate() {
            if let Err(err) = worker.join()? {
                log::trace!(target: "pool", "Worker {i} exited: {err}");
            }
        }
        log::debug!(target: "pool", "Worker pool shutting down..");

        Ok(())
    }
}