Radish alpha
H
rad:z3QDZAW2FAfuLvihrhiyDC9fAD8G9
HardenedBSD Package Manager
Radicle
Git
Rework distributing jobs over repo workers.
Vsevolod Stakhov committed 11 years ago
commit 45c25a1082168369b3197195352868ccd44d2e28
parent 10707d0
1 file changed +29 -7
modified libpkg/pkg_repo_create.c
@@ -308,6 +308,11 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
	else
		flags = PKG_OPEN_MANIFEST_ONLY | PKG_OPEN_MANIFEST_COMPACT;

+
	if (read(pip, digestbuf, 1) == -1) {
+
		pkg_emit_errno("pkg_create_repo_worker", "read");
+
		goto cleanup;
+
	}
+

	LL_FOREACH(start, cur) {
		if (cur_job >= nelts)
			break;
@@ -510,11 +515,11 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
	const char *metafile, bool legacy)
{
	FTS *fts = NULL;
-
	struct pkg_fts_item *fts_items = NULL, *fts_cur;
+
	struct pkg_fts_item *fts_items = NULL, *fts_cur, *fts_start;

	struct pkg_conflict *c, *ctmp;
	struct pkg_conflict_bulk *conflicts = NULL, *curcb, *tmpcb;
-
	int num_workers, i, remaining_workers, remain;
+
	int num_workers, i, remaining_workers, remain, cur_jobs, remain_jobs, nworker;
	size_t len, tasks_per_worker, ntask;
	struct digest_list_entry *dlist = NULL, *cur_dig, *dtmp;
	struct pollfd *pfd = NULL;
@@ -621,10 +626,14 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,

	pfd = calloc(num_workers, sizeof(struct pollfd));
	ntask = 0;
+
	cur_jobs = (remain > 0) ? tasks_per_worker + 1 : tasks_per_worker;
+
	remain_jobs = cur_jobs;
+
	fts_start = fts_items;
+
	nworker = 0;
+

	LL_FOREACH(fts_items, fts_cur) {
-
		if (ntask % tasks_per_worker == 0) {
+
		if (--remain_jobs == 0) {
			/* Create new worker */
-
			int nworker = ntask / tasks_per_worker;
			int ofl;
			int st = SOCK_DGRAM;

@@ -637,9 +646,7 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
				goto cleanup;
			}

-

-
			if (pkg_create_repo_worker(fts_cur, (remain-- > 0) ?
-
					tasks_per_worker + 1 : tasks_per_worker,
+
			if (pkg_create_repo_worker(fts_start, cur_jobs,
					packagesite, (filelist ? filesite : NULL), cur_pipe[1],
					(legacy ? NULL : meta)) == EPKG_FATAL) {
				close(cur_pipe[0]);
@@ -654,10 +661,25 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
			/* Make our end of the pipe non-blocking */
			ofl = fcntl(cur_pipe[0], F_GETFL, 0);
			fcntl(cur_pipe[0], F_SETFL, ofl | O_NONBLOCK);
+

+
			if (--remain > 0)
+
				cur_jobs = tasks_per_worker + 1;
+
			else
+
				cur_jobs = tasks_per_worker;
+

+
			remain_jobs = cur_jobs;
+
			fts_start = fts_cur;
+
			nworker ++;
		}
		ntask ++;
	}

+
	/* Send start marker to all workers */
+
	for (i = 0; i < num_workers; i ++) {
+
		if (write(pfd[i].fd, ".", 1) == -1)
+
			pkg_emit_errno("pkg_create_repo", "write");
+
	}
+

	ntask = 0;
	remaining_workers = num_workers;
	while(remaining_workers > 0) {