| |
pub storage: Storage,
|
| |
/// Configuration for performing fetched.
|
| |
pub fetch: FetchConfig,
|
| + |
/// Default policy, if a policy for a specific node or repository was not found.
|
| + |
pub policy: Policy,
|
| + |
/// Default scope, if a scope for a specific repository was not found.
|
| + |
pub scope: policy::Scope,
|
| + |
/// Path to the policies database.
|
| + |
pub policies_db: PathBuf,
|
| |
}
|
| |
|
| |
/// Error returned by fetch.
|
| |
|
| |
#[derive(Debug, Clone)]
|
| |
pub struct FetchConfig {
|
| - |
/// Default policy, if a policy for a specific node or repository was not found.
|
| - |
pub policy: Policy,
|
| - |
/// Default scope, if a scope for a specific repository was not found.
|
| - |
pub scope: policy::Scope,
|
| - |
/// Path to the policies database.
|
| - |
pub policies_db: PathBuf,
|
| |
/// Data limits when fetching from a remote.
|
| |
pub limit: FetchLimit,
|
| |
/// Public key of the local peer.
|
| |
}
|
| |
|
| |
fn is_authorized(&self, remote: NodeId, rid: Id) -> Result<(), UploadError> {
|
| - |
let policy = {
|
| - |
let policy = self.fetch_config.policy;
|
| - |
let scope = self.fetch_config.scope;
|
| - |
let db = &self.fetch_config.policies_db;
|
| - |
let policies = policy::Config::new(policy, scope, policy::Store::reader(db)?);
|
| - |
policies.repo_policy(&rid)?.policy
|
| - |
};
|
| + |
let policy = self.policies.repo_policy(&rid)?.policy;
|
| |
let repo = self.storage.repository(rid)?;
|
| |
let doc = repo.canonical_identity_doc()?;
|
| |
if !doc.is_visible_to(&remote) || policy == Policy::Block {
|
| |
channels: channels::ChannelsFlush,
|
| |
) -> Result<fetch::FetchResult, FetchError> {
|
| |
let FetchConfig {
|
| - |
policy,
|
| - |
scope,
|
| - |
policies_db,
|
| |
limit,
|
| |
local,
|
| |
expiry,
|
| |
} = &self.fetch_config;
|
| - |
let policies = policy::Config::new(*policy, *scope, policy::Store::reader(policies_db)?);
|
| |
// N.b. if the `rid` is blocked this will return an error, so
|
| |
// we won't continue with any further set up of the fetch.
|
| - |
let allowed = radicle_fetch::Allowed::from_config(rid, &policies)?;
|
| - |
let blocked = radicle_fetch::BlockList::from_config(&policies)?;
|
| + |
let allowed = radicle_fetch::Allowed::from_config(rid, &self.policies)?;
|
| + |
let blocked = radicle_fetch::BlockList::from_config(&self.policies)?;
|
| |
|
| |
let handle = fetch::Handle::new(rid, *local, &self.storage, allowed, blocked, channels)?;
|
| |
let result = handle.fetch(rid, &self.storage, *limit, remote, refs_at)?;
|
| |
|
| |
impl Pool {
|
| |
/// Create a new worker pool with the given parameters.
|
| - |
pub fn with(tasks: chan::Receiver<Task>, nid: NodeId, handle: Handle, config: Config) -> Self {
|
| + |
pub fn with(
|
| + |
tasks: chan::Receiver<Task>,
|
| + |
nid: NodeId,
|
| + |
handle: Handle,
|
| + |
config: Config,
|
| + |
) -> Result<Self, policy::Error> {
|
| |
let mut pool = Vec::with_capacity(config.capacity);
|
| |
for i in 0..config.capacity {
|
| + |
let policies = policy::Config::new(
|
| + |
config.policy,
|
| + |
config.scope,
|
| + |
policy::Store::reader(&config.policies_db)?,
|
| + |
);
|
| |
let worker = Worker {
|
| |
nid,
|
| |
tasks: tasks.clone(),
|
| |
handle: handle.clone(),
|
| |
storage: config.storage.clone(),
|
| |
fetch_config: config.fetch.clone(),
|
| + |
policies,
|
| |
};
|
| |
let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());
|
| |
|
| |
pool.push(thread);
|
| |
}
|
| - |
Self { pool }
|
| + |
Ok(Self { pool })
|
| |
}
|
| |
|
| |
/// Run the worker pool.
|