Radish alpha
H
rad:z3QDZAW2FAfuLvihrhiyDC9fAD8G9
HardenedBSD Package Manager
Radicle
Git
Convert pkg repo from threads to processes.
Vsevolod Stakhov committed 11 years ago
commit 654c36fe331bb6360443d0ee92edeac06f33dc98
parent 22e5002
1 file changed +393 -103
modified libpkg/pkg_repo_create.c
@@ -32,6 +32,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/sysctl.h>
+
#include <sys/wait.h>

#include <archive_entry.h>
#include <assert.h>
@@ -45,6 +46,9 @@
#include <sysexits.h>
#include <unistd.h>
#include <errno.h>
+
#include <fcntl.h>
+
#include <math.h>
+
#include <poll.h>

#include "pkg.h"
#include "private/event.h"
@@ -264,27 +268,327 @@ out:
	return;
}

+
struct pkg_fts_item {
+
	char *fts_accpath;
+
	char *pkg_path;
+
	char *fts_name;
+
	off_t fts_size;
+
	int fts_info;
+
	struct pkg_fts_item *next;
+
};
+

+
static struct pkg_fts_item*
+
pkg_create_repo_fts_new(FTSENT *fts, const char *root_path)
+
{
+
	struct pkg_fts_item *item;
+
	char *pkg_path;
+

+
	item = malloc(sizeof(*item));
+
	if (item != NULL) {
+
		item->fts_accpath = strdup(fts->fts_accpath);
+
		item->fts_name = strdup(fts->fts_name);
+
		item->fts_size = fts->fts_statp->st_size;
+
		item->fts_info = fts->fts_info;
+

+
		pkg_path = fts->fts_path;
+
		pkg_path += strlen(root_path);
+
		while (pkg_path[0] == '/')
+
			pkg_path++;
+

+
		item->pkg_path = strdup(pkg_path);
+
	}
+
	else {
+
		pkg_emit_errno("malloc", "struct pkg_fts_item");
+
	}
+

+
	return (item);
+
}
+

+
static void
+
pkg_create_repo_fts_free(struct pkg_fts_item *item)
+
{
+
	free(item->fts_accpath);
+
	free(item->pkg_path);
+
	free(item->fts_name);
+
	free(item);
+
}
+

+
static int
+
pkg_create_repo_read_fts(struct pkg_fts_item **items, FTS *fts,
+
	const char *repopath, size_t *plen)
+
{
+
	FTSENT *fts_ent;
+
	struct pkg_fts_item *fts_cur;
+
	char *ext;
+

+
	errno = 0;
+

+
	while ((fts_ent = fts_read(fts)) != NULL) {
+
		/* Skip everything that is not a file */
+
		if (fts_ent->fts_info != FTS_F)
+
			continue;
+

+
		ext = strrchr(fts_ent->fts_name, '.');
+

+
		if (ext == NULL)
+
			continue;
+

+
		if (strcmp(ext, ".tgz") != 0 &&
+
						strcmp(ext, ".tbz") != 0 &&
+
						strcmp(ext, ".txz") != 0 &&
+
						strcmp(ext, ".tar") != 0)
+
			continue;
+

+
		*ext = '\0';
+

+
		if (strcmp(fts_ent->fts_name, repo_db_archive) == 0 ||
+
						strcmp(fts_ent->fts_name, repo_packagesite_archive) == 0 ||
+
						strcmp(fts_ent->fts_name, repo_filesite_archive) == 0 ||
+
						strcmp(fts_ent->fts_name, repo_digests_archive) == 0 ||
+
						strcmp(fts_ent->fts_name, repo_conflicts_archive) == 0) {
+
			*ext = '.';
+
			continue;
+
		}
+

+
		*ext = '.';
+
		fts_cur = pkg_create_repo_fts_new(fts_ent, repopath);
+
		if (fts_cur == NULL)
+
			return (EPKG_FATAL);
+

+
		LL_PREPEND(*items, fts_cur);
+
		(*plen) ++;
+
	}
+

+
	if (errno != 0) {
+
		pkg_emit_errno("fts_read", "pkg_create_repo_read_fts");
+
		return (EPKG_FATAL);
+
	}
+

+
	return (EPKG_OK);
+
}
+

+
static int
+
pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
+
	const char *mlfile, const char *flfile, int pip)
+
{
+
	pid_t pid;
+
	int mfd, ffd;
+
	bool read_files = (flfile != NULL);
+
	int flags, ret = EPKG_OK;
+
	size_t cur_job = 0;
+
	struct pkg_fts_item *cur;
+
	struct pkg *pkg = NULL;
+
	struct pkg_manifest_key *keys = NULL;
+
	char checksum[SHA256_DIGEST_LENGTH * 2 + 1], *mdigest;
+
	char digestbuf[1024];
+

+
	struct sbuf *b = sbuf_new_auto();
+

+
	mfd = open(mlfile, O_APPEND|O_CREAT, 00644);
+
	if (mfd == -1) {
+
		pkg_emit_errno("pkg_create_repo_worker", "open");
+
		return (EPKG_FATAL);
+
	}
+

+
	if (read_files) {
+
		ffd = open(mlfile, O_APPEND|O_CREAT, 00644);
+
		if (ffd == -1) {
+
			pkg_emit_errno("pkg_create_repo_worker", "open");
+
			return (EPKG_FATAL);
+
		}
+
	}
+

+
	pid = fork();
+
	switch(pid) {
+
	case -1:
+
		pkg_emit_errno("pkg_create_repo_worker", "fork");
+
		return (EPKG_FATAL);
+
		break;
+
	case 0:
+
		close(mfd);
+
		if (read_files)
+
			close(ffd);
+

+
		return (EPKG_OK);
+
		break;
+
	}
+

+
	if (read_files)
+
		flags = PKG_OPEN_MANIFEST_ONLY;
+
	else
+
		flags = PKG_OPEN_MANIFEST_ONLY | PKG_OPEN_MANIFEST_COMPACT;
+

+
	LL_FOREACH(start, cur) {
+
		if (cur_job >= nelts)
+
			break;
+

+
		if (pkg_open(&pkg, cur->fts_accpath, keys, flags) == EPKG_OK) {
+
			int r;
+
			off_t mpos, fpos = 0;
+
			size_t mlen;
+
			const char *origin;
+

+
			sha256_file(cur->fts_accpath, checksum);
+
			pkg_set(pkg, PKG_CKSUM, checksum,
+
				PKG_REPOPATH, cur->pkg_path,
+
				PKG_PKGSIZE, cur->fts_size);
+
			pkg_get(pkg, PKG_ORIGIN, &origin);
+

+

+
			/*
+
			 * TODO: use pkg_checksum for new manifests
+
			 */
+
			sbuf_clear(b);
+
			pkg_emit_manifest_sbuf(pkg, b, PKG_MANIFEST_EMIT_COMPACT, &mdigest);
+
			mlen = sbuf_len(b);
+
			sbuf_finish(b);
+

+
			if (flock(mfd, LOCK_EX) == -1) {
+
				pkg_emit_errno("pkg_create_repo_worker", "flock");
+
				ret = EPKG_FATAL;
+
				goto cleanup;
+
			}
+

+
			mpos = lseek(mfd, 0, SEEK_END);
+

+
			if (write(mfd, sbuf_data(b), sbuf_len(b)) == -1) {
+
				pkg_emit_errno("pkg_create_repo_worker", "write");
+
				ret = EPKG_FATAL;
+
				flock(mfd, LOCK_UN);
+
				goto cleanup;
+
			}
+

+
			flock(mfd, LOCK_UN);
+

+
			if (read_files) {
+
				FILE *fl;
+

+
				if (flock(ffd, LOCK_EX) == -1) {
+
					pkg_emit_errno("pkg_create_repo_worker", "flock");
+
					ret = EPKG_FATAL;
+
					goto cleanup;
+
				}
+
				fpos = lseek(ffd, 0, SEEK_END);
+
				fl = fdopen(ffd, "a");
+
				pkg_emit_filelist(pkg, fl);
+
				fclose(fl);
+

+
				flock(ffd, LOCK_UN);
+
			}
+

+
			r = snprintf(digestbuf, sizeof(digestbuf), "%s:%s:%ld:%ld:%ld\n",
+
				origin, mdigest,
+
				(long)mpos,
+
				(long)fpos,
+
				(long)mlen);
+

+
			write(pip, digestbuf, r);
+
		}
+
		cur_job ++;
+
	}
+

+
cleanup:
+
	pkg_manifest_keys_free(keys);
+

+
	close(pip);
+
	close(mfd);
+
	if (read_files)
+
		close(ffd);
+

+
	exit(ret);
+
}
+

+
static int
+
pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist)
+
{
+
	struct digest_list_entry *dig;
+
	char buf[1024];
+
	int r, i, start;
+
	enum {
+
		s_set_origin = 0,
+
		s_set_digest,
+
		s_set_mpos,
+
		s_set_fpos,
+
		s_set_mlen
+
	} state = 0;
+

+
	for (;;) {
+
		r = read(fd, buf, sizeof(buf));
+

+
		if (r == -1) {
+
			if (errno == EINTR)
+
				continue;
+

+
			pkg_emit_errno("pkg_create_repo_read_pipe", "read");
+
			return (EPKG_FATAL);
+
		}
+
		else if (r == 0)
+
			return (EPKG_END);
+

+
		break;
+
	}
+

+
	/*
+
	 * XXX: can parse merely full lines
+
	 */
+
	start = 0;
+
	for (i = 0; i < r; i ++) {
+
		if (buf[i] == ':') {
+
			switch(state) {
+
			case s_set_origin:
+
				dig = malloc(sizeof(*dig));
+
				dig->origin = malloc(i - start + 1);
+
				strlcpy(dig->origin, &buf[start], i - start + 1);
+
				break;
+
			case s_set_digest:
+
				dig->digest = malloc(i - start + 1);
+
				strlcpy(dig->digest, &buf[start], i - start + 1);
+
				break;
+
			case s_set_mpos:
+
				dig->manifest_pos = strtol(&buf[start], NULL, 10);
+
				break;
+
			case s_set_fpos:
+
				dig->files_pos = strtol(&buf[start], NULL, 10);
+
				break;
+
			case s_set_mlen:
+
				/* Record should actually not finish with ':' */
+
				dig->manifest_length = strtol(&buf[start], NULL, 10);
+
				break;
+
			}
+
			start = i + 1;
+
		}
+
		else if (buf[i] == '\n') {
+
			dig->manifest_length = strtol(&buf[start], NULL, 10);
+
			DL_APPEND(*dlist, dig);
+
			break;
+
		}
+
	}
+

+
	return (EPKG_OK);
+
}
+

int
pkg_create_repo(char *path, const char *output_dir, bool filelist,
		void (progress)(struct pkg *pkg, void *data), void *data)
{
	FTS *fts = NULL;
-
	struct thd_data thd_data;
+
	struct pkg_fts_item *fts_items = NULL, *fts_cur;
+

	struct pkg_conflict *c, *ctmp;
	struct pkg_conflict_bulk *conflicts = NULL, *curcb, *tmpcb;
-
	int num_workers;
-
	size_t len;
-
	pthread_t *tids = NULL;
+
	int num_workers, i;
+
	size_t len, tasks_per_worker, ntask;
	struct digest_list_entry *dlist = NULL, *cur_dig, *dtmp;
+
	struct pollfd *pfd = NULL;
+
	int cur_pipe[2];

	int retcode = EPKG_OK;

	char *repopath[2];
-
	char repodb[MAXPATHLEN];
-
	char *manifest_digest;
-
	FILE *psyml, *fsyml, *mandigests, *fconflicts;
-

-
	psyml = fsyml = mandigests = fconflicts = NULL;
+
	char packagesite[MAXPATHLEN],
+
		 filesite[MAXPATHLEN],
+
		 repodb[MAXPATHLEN];
+
	FILE *mandigests = NULL;

	if (!is_dir(path)) {
		pkg_emit_error("%s is not a directory", path);
@@ -309,16 +613,16 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
		goto cleanup;
	}

-
	snprintf(repodb, sizeof(repodb), "%s/%s", output_dir,
+
	snprintf(packagesite, sizeof(packagesite), "%s/%s", output_dir,
	    repo_packagesite_file);
-
	if ((psyml = fopen(repodb, "w")) == NULL) {
+
	if (access(packagesite, W_OK) == -1) {
		retcode = EPKG_FATAL;
		goto cleanup;
	}
	if (filelist) {
-
		snprintf(repodb, sizeof(repodb), "%s/%s", output_dir,
+
		snprintf(filesite, sizeof(filesite), "%s/%s", output_dir,
		    repo_filesite_file);
-
		if ((fsyml = fopen(repodb, "w")) == NULL) {
+
		if (access(filesite, W_OK) == -1) {
			retcode = EPKG_FATAL;
			goto cleanup;
		}
@@ -330,92 +634,90 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
		goto cleanup;
	}

-
	snprintf(repodb, sizeof(repodb), "%s/%s", output_dir, repo_conflicts_file);
-
	if ((fconflicts = fopen(repodb, "w")) == NULL) {
+
	len = 0;
+

+
	pkg_create_repo_read_fts(&fts_items, fts, path, &len);
+

+
	if (len == 0) {
+
		/* Nothing to do */
+
		pkg_emit_error("No package files have been found");
		retcode = EPKG_FATAL;
		goto cleanup;
	}

-
	thd_data.root_path = path;
-
	thd_data.max_results = num_workers;
-
	thd_data.num_results = 0;
-
	thd_data.stop = false;
-
	thd_data.fts = fts;
-
	thd_data.read_files = filelist;
-
	pthread_mutex_init(&thd_data.fts_m, NULL);
-
	thd_data.results = NULL;
-
	thd_data.thd_finished = 0;
-
	pthread_mutex_init(&thd_data.results_m, NULL);
-
	pthread_cond_init(&thd_data.has_result, NULL);
-
	pthread_cond_init(&thd_data.has_room, NULL);
+
	/* Split items over all workers */
+
	num_workers = MIN(num_workers, len);
+
	tasks_per_worker = ceil((double)len / num_workers);

	/* Launch workers */
-
	tids = calloc(num_workers, sizeof(pthread_t));
-
	for (int i = 0; i < num_workers; i++) {
-
		pthread_create(&tids[i], NULL, (void *)&pkg_read_pkg_file, &thd_data);
-
	}
-

-
	for (;;) {
-
		struct pkg_result *r;
-
		const char *origin;
-

-
		long manifest_pos, files_pos, manifest_length;
+
	pfd = calloc(num_workers, sizeof(struct pollfd));
+
	ntask = 0;
+
	LL_FOREACH(fts_items, fts_cur) {
+
		if (ntask % tasks_per_worker == 0) {
+
			/* Create new worker */
+
			int nworker = ntask / tasks_per_worker;
+

+
			if (pipe(cur_pipe) == -1) {
+
				pkg_emit_errno("pkg_create_repo", "pipe");
+
				retcode = EPKG_FATAL;
+
				goto cleanup;
+
			}

-
		pthread_mutex_lock(&thd_data.results_m);
-
		while ((r = thd_data.results) == NULL) {
-
			if (thd_data.thd_finished == num_workers) {
-
				break;
+
			if (pkg_create_repo_worker(fts_cur, tasks_per_worker, packagesite,
+
				filelist ? filesite : NULL, cur_pipe[1]) == EPKG_FATAL) {
+
				close(cur_pipe[0]);
+
				close(cur_pipe[1]);
+
				retcode = EPKG_FATAL;
+
				goto cleanup;
			}
-
			pthread_cond_wait(&thd_data.has_result, &thd_data.results_m);
-
		}
-
		if (r != NULL) {
-
			DL_DELETE(thd_data.results, thd_data.results);
-
			thd_data.num_results--;
-
			pthread_cond_signal(&thd_data.has_room);
-
		}
-
		pthread_mutex_unlock(&thd_data.results_m);
-
		if (r == NULL) {
-
			break;
-
		}

-
		if (r->retcode != EPKG_OK) {
-
			free(r);
-
			continue;
+
			pfd[nworker].fd = cur_pipe[0];
+
			pfd[nworker].events = POLL_IN;
+
			close(cur_pipe[1]);
		}
+
		ntask ++;
+
	}

-
		/* EPKG_END returned */
-

-
		if (progress != NULL)
-
			progress(r->pkg, data);
-

-
		manifest_pos = ftell(psyml);
-
		pkg_emit_manifest_file(r->pkg, psyml, PKG_MANIFEST_EMIT_COMPACT, &manifest_digest);
-
		manifest_length = ftell(psyml) - manifest_pos;
-
		if (filelist) {
-
			files_pos = ftell(fsyml);
-
			pkg_emit_filelist(r->pkg, fsyml);
-
		} else {
-
			files_pos = 0;
+
	while(num_workers > 0) {
+
		retcode = poll(pfd, num_workers, -1);
+
		if (retcode == -1) {
+
			if (errno == EINTR) {
+
				continue;
+
			}
+
			else {
+
				retcode = EPKG_FATAL;
+
				goto cleanup;
+
			}
+
		}
+
		else if (retcode > 1) {
+
			for (i = 0; i < num_workers; i ++) {
+
				if (pfd[i].revents & POLL_IN) {
+
					if (pkg_create_repo_read_pipe(pfd[i].fd, &dlist) != EPKG_OK) {
+
						/*
+
						 * Wait for the worker finished
+
						 */
+
						int st;
+

+
						if (wait(&st) == -1)
+
							pkg_emit_errno("pkg_create_repo", "wait");
+

+
						num_workers --;
+
					}
+
				}
+
			}
		}
-

-
		pkg_get(r->pkg, PKG_ORIGIN, &origin);
-

-
		cur_dig = malloc(sizeof (struct digest_list_entry));
-
		cur_dig->origin = strdup(origin);
-
		cur_dig->digest = manifest_digest;
-
		cur_dig->manifest_pos = manifest_pos;
-
		cur_dig->files_pos = files_pos;
-
		cur_dig->manifest_length = manifest_length;
-
		DL_APPEND(dlist, cur_dig);
-

-
		pkg_free(r->pkg);
-
		free(r);
	}

	/* Now sort all digests */
	DL_SORT(dlist, pkg_digest_sort_compare_func);

+
	/*
+
	 * XXX: it is not used actually
+
	 */
+
#if 0
	pkg_repo_write_conflicts(conflicts, fconflicts);
+
#endif
+

cleanup:
	HASH_ITER (hh, conflicts, curcb, tmpcb) {
		HASH_ITER (hh, curcb->conflicts, c, ctmp) {
@@ -426,6 +728,16 @@ cleanup:
		HASH_DEL(conflicts, curcb);
		free(curcb);
	}
+
	/* Close pipes */
+
	if (pfd != NULL) {
+
		for (i = 0; i < num_workers; i ++)
+
			close(pfd[i].fd);
+
		free(pfd);
+
	}
+
	if (fts != NULL)
+
		fts_close(fts);
+

+
	LL_FREE(fts_items, pkg_create_repo_fts_free);
	LL_FOREACH_SAFE(dlist, cur_dig, dtmp) {
		fprintf(mandigests, "%s:%s:%ld:%ld:%ld\n", cur_dig->origin,
		    cur_dig->digest, cur_dig->manifest_pos, cur_dig->files_pos,
@@ -434,32 +746,10 @@ cleanup:
		free(cur_dig->origin);
		free(cur_dig);
	}
-
	if (tids != NULL) {
-
		/* Cancel running threads */
-
		if (retcode != EPKG_OK) {
-
			pthread_mutex_lock(&thd_data.fts_m);
-
			thd_data.stop = true;
-
			pthread_mutex_unlock(&thd_data.fts_m);
-
		}
-
		/* Join on threads to release thread IDs */
-
		for (int i = 0; i < num_workers; i++) {
-
			pthread_join(tids[i], NULL);
-
		}
-
		free(tids);
-
	}

	if (fts != NULL)
		fts_close(fts);

-
	if (fsyml != NULL)
-
		fclose(fsyml);
-

-
	if (psyml != NULL)
-
		fclose(psyml);
-

-
	if (fconflicts != NULL)
-
		fclose(fconflicts);
-

	if (mandigests != NULL)
		fclose(mandigests);