Radish alpha
H
rad:z3QDZAW2FAfuLvihrhiyDC9fAD8G9
HardenedBSD Package Manager
Radicle
Git
Use a pool of threads in pkg repo.
Julien Laffaye committed 13 years ago
commit f892aeb6bea1591ea1b7ece5ec391426d7ae1760
parent 02f8a13
4 files changed +222 -52
modified libpkg/pkg_repo.c
@@ -43,6 +43,7 @@
#include "private/utils.h"
#include "private/pkg.h"
#include "private/pkgdb.h"
+
#include "private/thd_repo.h"

/* The package repo schema major revision */
#define REPO_SCHEMA_MAJOR 2
@@ -543,24 +544,21 @@ pkg_create_repo(char *path, bool force,
    void (progress)(struct pkg *pkg, void *data), void *data)
{
	FTS *fts = NULL;
-
	FTSENT *ent = NULL;
+
	struct thd_data thd_data;
+
	int num_workers = 6;
+
	pthread_t *tids = NULL;

-
	struct pkg *pkg = NULL;
	struct pkg_dep *dep = NULL;
	struct pkg_category *category = NULL;
	struct pkg_license *license = NULL;
	struct pkg_option *option = NULL;
	struct pkg_shlib *shlib = NULL;
-
	struct sbuf *manifest = NULL;
-
	char *ext = NULL;

	sqlite3 *sqlite = NULL;

	int64_t package_id;
	char *errmsg = NULL;
	int retcode = EPKG_OK;
-
	char *pkg_path;
-
	char cksum[SHA256_DIGEST_LENGTH * 2 +1];
	int ret;

	char *repopath[2];
@@ -616,43 +614,52 @@ pkg_create_repo(char *path, bool force,
	if ((retcode = initialize_prepared_statements(sqlite)) != EPKG_OK)
		goto cleanup;

-
	manifest = sbuf_new_auto();
-
	while ((ent = fts_read(fts)) != NULL) {
+
	thd_data.root_path = path;
+
	thd_data.stop = false;
+
	thd_data.fts = fts;
+
	pthread_mutex_init(&thd_data.fts_m, NULL);
+
	STAILQ_INIT(&thd_data.results);
+
	thd_data.thd_finished = 0;
+
	pthread_mutex_init(&thd_data.results_m, NULL);
+
	pthread_cond_init(&thd_data.has_result, NULL);
+

+
	/* Launch workers */
+
	tids = calloc(num_workers, sizeof(pthread_t));
+
	for (int i = 0; i < num_workers; i++) {
+
		pthread_create(&tids[i], NULL, (void *)&read_pkg_file, &thd_data);
+
	}
+

+
	for (;;) {
+
		struct pkg_result *r;
+

		const char *name, *version, *origin, *comment, *desc;
		const char *arch, *maintainer, *www, *prefix;
		int64_t flatsize;
		lic_t licenselogic;

-
		/* skip everything that is not a file */
-
		if (ent->fts_info != FTS_F)
-
			continue;
-

-
		ext = strrchr(ent->fts_name, '.');
-

-
		if (ext == NULL)
-
			continue;
-

-
		if (strcmp(ext, ".tgz") != 0 &&
-
				strcmp(ext, ".tbz") != 0 &&
-
				strcmp(ext, ".txz") != 0 &&
-
				strcmp(ext, ".tar") != 0)
-
			continue;
+
		pthread_mutex_lock(&thd_data.results_m);
+
		while ((r = STAILQ_FIRST(&thd_data.results)) == NULL) {
+
			if (thd_data.thd_finished == num_workers) {
+
				break;
+
			}
+
			pthread_cond_wait(&thd_data.has_result, &thd_data.results_m);
+
		}
+
		if (r != NULL) {
+
			STAILQ_REMOVE_HEAD(&thd_data.results, next);
+
		}
+
		pthread_mutex_unlock(&thd_data.results_m);
+
		if (r == NULL) {
+
			break;
+
		}

-
		if (strcmp(ent->fts_name, "repo.txz") == 0)
+
		if (r->retcode != EPKG_OK) {
			continue;
-

-
		pkg_path = ent->fts_path;
-
		pkg_path += strlen(path);
-
		while (pkg_path[0] == '/')
-
			pkg_path++;
-

-
		cksum[0] = '\0';
-
		sha256_file(ent->fts_accpath, cksum);
+
		}

		/* do not add if package if already in repodb
		   (possibly at a different pkg_path) */

-
		if (run_prepared_statement(EXISTS, cksum) != SQLITE_ROW) {
+
		if (run_prepared_statement(EXISTS, r->cksum) != SQLITE_ROW) {
			ERROR_SQLITE(sqlite);
			goto cleanup;
		}
@@ -660,15 +667,10 @@ pkg_create_repo(char *path, bool force,
			continue;
		}

-
		if (pkg_open(&pkg, ent->fts_accpath, manifest) != EPKG_OK) {
-
			retcode = EPKG_WARN;
-
			continue;
-
		}
-

		if (progress != NULL)
-
			progress(pkg, data);
+
			progress(r->pkg, data);

-
		pkg_get(pkg, PKG_ORIGIN, &origin, PKG_NAME, &name,
+
		pkg_get(r->pkg, PKG_ORIGIN, &origin, PKG_NAME, &name,
		    PKG_VERSION, &version, PKG_COMMENT, &comment,
		    PKG_DESC, &desc, PKG_ARCH, &arch,
		    PKG_MAINTAINER, &maintainer, PKG_WWW, &www,
@@ -678,11 +680,11 @@ pkg_create_repo(char *path, bool force,
	try_again:
		if ((ret = run_prepared_statement(PKG, origin, name, version,
		    comment, desc, arch, maintainer, www, prefix,
-
		    ent->fts_statp->st_size, flatsize, (int64_t)licenselogic, cksum,
-
		    pkg_path)) != SQLITE_DONE) {
+
		    r->size, flatsize, (int64_t)licenselogic, r->cksum,
+
		    r->path)) != SQLITE_DONE) {
			if (ret == SQLITE_CONSTRAINT) {
				switch(maybe_delete_conflicting(origin,
-
				    version, pkg_path)) {
+
				    version, r->path)) {
				case EPKG_FATAL: /* sqlite error */
					ERROR_SQLITE(sqlite);
					retcode = EPKG_FATAL;
@@ -705,7 +707,7 @@ pkg_create_repo(char *path, bool force,
		package_id = sqlite3_last_insert_rowid(sqlite);

		dep = NULL;
-
		while (pkg_deps(pkg, &dep) == EPKG_OK) {
+
		while (pkg_deps(r->pkg, &dep) == EPKG_OK) {
			if (run_prepared_statement(DEPS,
			    pkg_dep_origin(dep),
			    pkg_dep_name(dep),
@@ -718,7 +720,7 @@ pkg_create_repo(char *path, bool force,
		}

		category = NULL;
-
		while (pkg_categories(pkg, &category) == EPKG_OK) {
+
		while (pkg_categories(r->pkg, &category) == EPKG_OK) {
			const char *cat_name = pkg_category_name(category);

			ret = run_prepared_statement(CAT1, cat_name);
@@ -734,7 +736,7 @@ pkg_create_repo(char *path, bool force,
		}

		license = NULL;
-
		while (pkg_licenses(pkg, &license) == EPKG_OK) {
+
		while (pkg_licenses(r->pkg, &license) == EPKG_OK) {
			const char *lic_name = pkg_license_name(license);

			ret = run_prepared_statement(LIC1, lic_name);
@@ -748,7 +750,7 @@ pkg_create_repo(char *path, bool force,
			}
		}
		option = NULL;
-
		while (pkg_options(pkg, &option) == EPKG_OK) {
+
		while (pkg_options(r->pkg, &option) == EPKG_OK) {
			if (run_prepared_statement(OPTS,
			    pkg_option_opt(option),
			    pkg_option_value(option),
@@ -760,7 +762,7 @@ pkg_create_repo(char *path, bool force,
		}

		shlib = NULL;
-
		while (pkg_shlibs(pkg, &shlib) == EPKG_OK) {
+
		while (pkg_shlibs(r->pkg, &shlib) == EPKG_OK) {
			const char *shlib_name = pkg_shlib_name(shlib);

			ret = run_prepared_statement(SHLIB1, shlib_name);
@@ -774,6 +776,9 @@ pkg_create_repo(char *path, bool force,
				goto cleanup;
			}
		}
+

+
		pkg_free(r->pkg);
+
		free(r);
	}

	if (sqlite3_exec(sqlite, "COMMIT;", NULL, NULL, &errmsg) != SQLITE_OK) {
@@ -782,12 +787,24 @@ pkg_create_repo(char *path, bool force,
	}

	cleanup:
+
	// Cancel running threads
+
	if (retcode != EPKG_OK) {
+
		pthread_mutex_lock(&thd_data.fts_m);
+
		thd_data.stop = true;
+
		pthread_mutex_unlock(&thd_data.fts_m);
+
	}
+

+
	// Join on threads to release thread IDs
+
	if (tids != NULL) {
+
		for (int i = 0; i < num_workers; i++) {
+
			pthread_join(tids[i], NULL);
+
		}
+
		free(tids);
+
	}
+

	if (fts != NULL)
		fts_close(fts);

-
	if (pkg != NULL)
-
		pkg_free(pkg);
-

	finalize_prepared_statements();

	if (sqlite != NULL)
@@ -796,13 +813,105 @@ pkg_create_repo(char *path, bool force,
	if (errmsg != NULL)
		sqlite3_free(errmsg);

-
	sbuf_free(manifest);
-

	sqlite3_shutdown();

	return (retcode);
}

+
void
+
read_pkg_file(void *data)
+
{
+
	struct thd_data *d = (struct thd_data*) data;
+
	struct pkg_result *r;
+

+
	FTSENT *fts_ent = NULL;
+
	char fts_accpath[MAXPATHLEN + 1];
+
	char fts_path[MAXPATHLEN + 1];
+
	char fts_name[MAXPATHLEN + 1];
+
	off_t st_size;
+
	int fts_info;
+

+
	struct sbuf *manifest = sbuf_new_auto();
+
	char *ext = NULL;
+
	char *pkg_path;
+

+
	for (;;) {
+
		fts_ent = NULL;
+

+
		/*
+
		 * Get a file to read from.
+
		 * Copy the data we need from the fts entry localy because as soon as
+
		 * we unlock the fts_m mutex, we can not access it.
+
		 */
+
		pthread_mutex_lock(&d->fts_m);
+
		if (d->stop == false) {
+
			fts_ent = fts_read(d->fts);
+
		}
+
		if (fts_ent != NULL) {
+
			strlcpy(fts_accpath, fts_ent->fts_accpath, sizeof(fts_accpath));
+
			strlcpy(fts_path, fts_ent->fts_path, sizeof(fts_path));
+
			strlcpy(fts_name, fts_ent->fts_name, sizeof(fts_name));
+
			st_size = fts_ent->fts_statp->st_size;
+
			fts_info = fts_ent->fts_info;
+
		}
+
		pthread_mutex_unlock(&d->fts_m);
+

+
		// There is no more jobs, exit the main loop.
+
		if (fts_ent == NULL)
+
				break;
+

+
		/* skip everything that is not a file */
+
		if (fts_info != FTS_F)
+
			continue;
+

+
		ext = strrchr(fts_name, '.');
+

+
		if (ext == NULL)
+
			continue;
+

+
		if (strcmp(ext, ".tgz") != 0 &&
+
				strcmp(ext, ".tbz") != 0 &&
+
				strcmp(ext, ".txz") != 0 &&
+
				strcmp(ext, ".tar") != 0)
+
			continue;
+

+
		if (strcmp(fts_name, "repo.txz") == 0)
+
			continue;
+

+
		pkg_path = fts_path;
+
		pkg_path += strlen(d->root_path);
+
		while (pkg_path[0] == '/')
+
			pkg_path++;
+

+
		r = calloc(1, sizeof(struct pkg_result));
+
		strlcpy(r->path, pkg_path, sizeof(r->path));
+
		r->size = st_size;
+

+
		sha256_file(fts_accpath, r->cksum);
+

+
		if (pkg_open(&r->pkg, fts_accpath, manifest) != EPKG_OK) {
+
			r->retcode = EPKG_WARN;
+
		}
+

+
		/* Add result to the FIFO and notify */
+
		pthread_mutex_lock(&d->results_m);
+
		STAILQ_INSERT_TAIL(&d->results, r, next);
+
		pthread_cond_signal(&d->has_result);
+
		pthread_mutex_unlock(&d->results_m);
+
	}
+

+
	/*
+
	 * This thread is about to exit.
+
	 * Notify the main thread that we are done.
+
	 */
+
	pthread_mutex_lock(&d->results_m);
+
	d->thd_finished++;
+
	pthread_cond_signal(&d->has_result);
+
	pthread_mutex_unlock(&d->results_m);
+

+
	sbuf_free(manifest);
+
}
+

int
pkg_finish_repo(char *path, pem_password_cb *password_cb, char *rsa_key_path)
{
added libpkg/private/thd_repo.h
@@ -0,0 +1,59 @@
+
/*
+
 * Copyright (c) 2012 Julien Laffaye <jlaffaye@FreeBSD.org>
+
 * All rights reserved.
+
 * 
+
 * Redistribution and use in source and binary forms, with or without
+
 * modification, are permitted provided that the following conditions
+
 * are met:
+
 * 1. Redistributions of source code must retain the above copyright
+
 *    notice, this list of conditions and the following disclaimer
+
 *    in this position and unchanged.
+
 * 2. Redistributions in binary form must reproduce the above copyright
+
 *    notice, this list of conditions and the following disclaimer in the
+
 *    documentation and/or other materials provided with the distribution.
+
 * 
+
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR
+
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+
 * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT,
+
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
 */
+

+
#ifndef _PKG_THD_REPO_H
+
#define _PKG_THD_REPO_H
+

+
#include <sys/queue.h>
+
#include <sys/types.h>
+
#include <pthread.h>
+

+
struct pkg_result {
+
	struct pkg *pkg;
+
	char path[MAXPATHLEN + 1];
+
	char cksum[SHA256_DIGEST_LENGTH * 2 + 1];
+
	off_t size;
+
	int retcode; // to pass errors
+
	STAILQ_ENTRY(pkg_result) next;
+
};
+

+
struct thd_data {
+
	char *root_path;
+

+
	FTS *fts;
+
	bool stop;
+
	pthread_mutex_t fts_m; // protects `fts' and `stop'
+

+
	/* results is used as a FIFO */
+
	STAILQ_HEAD(results, pkg_result) results;
+
	int thd_finished;
+
	pthread_mutex_t results_m; // protects `results' an `thd_finished'
+
	pthread_cond_t has_result; // signal that there is at least one result
+
};
+

+
void read_pkg_file(void *);
+

+
#endif
modified pkg-static/Makefile
@@ -10,6 +10,7 @@ LDADD_STATIC= -L${.OBJDIR}/../external/sqlite \
		-larchive \
		-lsbuf \
		-lfetch \
+
		-lpthread \
		-lelf \
		-lssl \
		-lcrypto \
modified pkg/Makefile
@@ -48,6 +48,7 @@ LDADD+= -L${.OBJDIR}/../libpkg \
		-lpkg \
		-lutil \
		-ljail \
+
		-lpthread \
		${LDADD_STATIC}

WARNS?=		6