Radish alpha
H
rad:z3QDZAW2FAfuLvihrhiyDC9fAD8G9
HardenedBSD Package Manager
Radicle
Git
pkg repo: rework the balance between the workers
Baptiste Daroussin committed 4 years ago
commit e48aa7bdedcb9ceca9512e8d6d6c3a977a901cdc
parent 20d6d8c
1 file changed +91 -56
modified libpkg/pkg_repo_create.c
@@ -61,6 +61,7 @@

enum {
	MSG_PKG_DONE=0,
+
	MSG_PKG_READY,
	MSG_DIGEST,
};

@@ -335,22 +336,23 @@ tell_parent(int fd, char *buf, size_t len)
}

static int
-
pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
-
	int mfd, int ffd, int pip,
+
pkg_create_repo_worker(int mfd, int ffd, int pip,
	struct pkg_repo_meta *meta)
{
	pid_t pid;
+
	struct pollfd *pfd = NULL;
	int flags, ret = EPKG_OK;
-
	size_t cur_job = 0;
-
	struct pkg_fts_item *cur;
+
	size_t sz;
	struct pkg *pkg = NULL;
	struct pkg_manifest_key *keys = NULL;
	char *mdigest = NULL;
	char digestbuf[1024];
	xstring *b;
	struct iovec iov[2];
+
	uint32_t len;
	char buf[1024];
	char *w;
+
	const char *rbuf, *c;

	b = xstring_new();

@@ -371,7 +373,7 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
	}

	pkg_manifest_keys_new(&keys);
-
	pkg_debug(1, "start worker to parse %jd packages", (intmax_t)nelts);
+
	pkg_debug(1, "start worker to parse packages");

	if (ffd != -1)
		flags = PKG_OPEN_MANIFEST_ONLY;
@@ -384,23 +386,63 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
		goto cleanup;
	}

-
	LL_FOREACH(start, cur) {
-
		if (cur_job >= nelts)
-
			break;
+
	pfd = xcalloc(1, sizeof(struct pollfd));
+
	pfd[0].fd = pip;
+
	pfd[0].events = POLLIN;

-
		if (pkg_open(&pkg, cur->fts_accpath, keys, flags) == EPKG_OK) {
+
	for (;;) {
+
		w = buf;
+
		w = mp_encode_array(w, 1);
+
		w = mp_encode_uint(w, MSG_PKG_READY);
+
		tell_parent(pip, buf, w - buf);
+
		if (poll(pfd, 1, -1) == -1) {
+
			if (errno == EINTR)
+
				continue;
+
			else
+
				goto cleanup;
+
		}
+
		if (pfd[0].revents & (POLLIN|POLLHUP|POLLERR)) {
+
			for (;;) {
+
				ssize_t r;
+
				r = read(pfd[0].fd, buf, sizeof(buf));
+
				if (r == -1) {
+
					if (errno == EINTR)
+
						continue;
+
					else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+
						return (EPKG_OK);
+
					}
+
					pkg_emit_errno("pkg_repo_worker", "read");
+
					return (EPKG_FATAL);
+
				} else if (r == 0)
+
					return (EPKG_END);
+
				else
+
					break;
+
			}
+
		} else {
+
			continue;
+
		}
+
		rbuf = buf;
+
		sz = mp_decode_array(&rbuf);
+
		if (sz < 1)
+
			continue;
+
		c = mp_decode_str(&rbuf, &len);
+
		if (len == 0) /* empty package name means ends of repo */
+
			break;
+
		char *path = xstrndup(c, len);
+
		if (pkg_open(&pkg, path, keys, flags) == EPKG_OK) {
			off_t mpos, fpos = 0;
			size_t mlen;
+
			struct stat st;

-
			pkg->sum = pkg_checksum_file(cur->fts_accpath,
-
			    PKG_HASH_TYPE_SHA256_HEX);
-
			pkg->pkgsize = cur->fts_size;
+
			pkg->sum = pkg_checksum_file(path, PKG_HASH_TYPE_SHA256_HEX);
+
			stat(path, &st);
+
			pkg->pkgsize = st.st_size;
			if (meta->hash) {
-
				ret = hash_file(meta, pkg, cur->fts_accpath);
+
				ret = hash_file(meta, pkg, path);
				if (ret != EPKG_OK)
					goto cleanup;
			} else {
-
				pkg->repopath = xstrdup(cur->pkg_path);
+
				pkg->repopath = xstrdup(path);
			}

			/*
@@ -480,7 +522,7 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
			w = mp_encode_uint(w, MSG_PKG_DONE);
			tell_parent(pip, buf, w - buf);
		}
-
		cur_job ++;
+
		free(path);
	}

cleanup:
@@ -494,7 +536,7 @@ cleanup:
}

static int
-
pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist)
+
pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist, struct pkg_fts_item **items)
{
	struct digest_list_entry *dig = NULL;
	char buf[1024];
@@ -507,7 +549,6 @@ pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist)
	for (;;) {
		dig = NULL;
		r = read(fd, buf, sizeof(buf));
-

		if (r == -1) {
			if (errno == EINTR)
				continue;
@@ -549,6 +590,17 @@ pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist)
			dig->checksum = xstrndup(c, len);
			DL_APPEND(*dlist, dig);
		}
+

+
		if (msgtype == MSG_PKG_READY) {
+
			char *w;
+
			const char *str = (*items) != NULL ? (*items)->fts_accpath : "";
+
			w = buf;
+
			w = mp_encode_array(w, 1);
+
			w = mp_encode_str(w, str, strlen(str));
+
			if (*items != NULL)
+
				LL_DELETE((*items), (*items));
+
			tell_parent(fd, buf, w - buf);
+
		}
	}

	/*
@@ -579,10 +631,10 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
	const char *metafile, bool hash, bool hash_symlink)
{
	FTS *fts = NULL;
-
	struct pkg_fts_item *fts_items = NULL, *fts_cur, *fts_start;
+
	struct pkg_fts_item *fts_items = NULL;
	pkghash *conflicts = NULL;
	struct pkg_conflict_bulk *curcb;
-
	int num_workers, i, remaining_workers, remain, cur_jobs, remain_jobs, nworker;
+
	int num_workers, i, remaining_workers, remain;
	size_t len, tasks_per_worker, ntask;
	struct digest_list_entry *dlist = NULL, *cur_dig, *dtmp;
	struct pollfd *pfd = NULL;
@@ -698,45 +750,29 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,

	pfd = xcalloc(num_workers, sizeof(struct pollfd));
	ntask = 0;
-
	cur_jobs = (remain > 0) ? tasks_per_worker + 1 : tasks_per_worker;
-
	remain_jobs = cur_jobs;
-
	fts_start = fts_items;
-
	nworker = 0;
-

-
	LL_FOREACH(fts_items, fts_cur) {
-
		if (--remain_jobs == 0) {
-
			/* Create new worker */
-
			int ofl;
-

-
			if (get_socketpair(cur_pipe) == -1) {
-
				pkg_emit_errno("pkg_create_repo", "pipe");
-
				goto cleanup;
-
			}

-
			if (pkg_create_repo_worker(fts_start, cur_jobs, mfd,
-
			    ffd, cur_pipe[1], meta) == EPKG_FATAL) {
-
				close(cur_pipe[0]);
-
				close(cur_pipe[1]);
-
				goto cleanup;
-
			}
+
	for (int i = 0; i < num_workers; i++) {
+
		/* Create new worker */
+
		int ofl;

-
			pfd[nworker].fd = cur_pipe[0];
-
			pfd[nworker].events = POLLIN;
-
			close(cur_pipe[1]);
-
			/* Make our end of the pipe non-blocking */
-
			ofl = fcntl(cur_pipe[0], F_GETFL, 0);
-
			fcntl(cur_pipe[0], F_SETFL, ofl | O_NONBLOCK);
-

-
			if (--remain > 0)
-
				cur_jobs = tasks_per_worker + 1;
-
			else
-
				cur_jobs = tasks_per_worker;
+
		if (get_socketpair(cur_pipe) == -1) {
+
			pkg_emit_errno("pkg_create_repo", "pipe");
+
			goto cleanup;
+
		}

-
			remain_jobs = cur_jobs;
-
			fts_start = fts_cur->next;
-
			nworker ++;
+
		if (pkg_create_repo_worker(mfd,
+
		    ffd, cur_pipe[1], meta) == EPKG_FATAL) {
+
			close(cur_pipe[0]);
+
			close(cur_pipe[1]);
+
			goto cleanup;
		}
-
		ntask ++;
+

+
		pfd[i].fd = cur_pipe[0];
+
		pfd[i].events = POLLIN;
+
		close(cur_pipe[1]);
+
		/* Make our end of the pipe non-blocking */
+
		ofl = fcntl(cur_pipe[0], F_GETFL, 0);
+
		fcntl(cur_pipe[0], F_SETFL, ofl | O_NONBLOCK);
	}

	/* Send start marker to all workers */
@@ -764,11 +800,10 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
			for (i = 0; i < num_workers; i ++) {
				if (pfd[i].fd != -1 &&
								(pfd[i].revents & (POLLIN|POLLHUP|POLLERR))) {
-
					if (pkg_create_repo_read_pipe(pfd[i].fd, &dlist) != EPKG_OK) {
+
					if (pkg_create_repo_read_pipe(pfd[i].fd, &dlist, &fts_items) != EPKG_OK) {
						/*
						 * Wait for the worker finished
						 */
-

						while (wait(&st) == -1) {
							if (errno == EINTR)
								continue;