Radish alpha
H
rad:z3QDZAW2FAfuLvihrhiyDC9fAD8G9
HardenedBSD Package Manager
Radicle
Git
repo: switch from fork model to threads
Baptiste Daroussin committed 2 years ago
commit 1f258b9e892ae87619d6bcd72bfe60eab963cd85
parent ae6dcd7
1 file changed +74 -267
modified libpkg/pkg_repo_create.c
@@ -1,5 +1,5 @@
/*-
-
 * Copyright (c) 2011-2021 Baptiste Daroussin <bapt@FreeBSD.org>
+
 * Copyright (c) 2011-2023 Baptiste Daroussin <bapt@FreeBSD.org>
 * Copyright (c) 2011-2012 Julien Laffaye <jlaffaye@FreeBSD.org>
 * Copyright (c) 2011-2012 Marin Atanasov Nikolov <dnaeon@gmail.com>
 * Copyright (c) 2012-2013 Matthew Seaman <matthew@FreeBSD.org>
@@ -49,12 +49,11 @@
#include <stdbool.h>
#include <unistd.h>
#include <errno.h>
-
#include <fcntl.h>
#include <math.h>
-
#include <poll.h>
#include <sys/uio.h>
-
#include <msgpuck.h>
+
#include <pthread.h>

+
#include "tllist.h"
#include "pkg.h"
#include "private/event.h"
#include "private/utils.h"
@@ -153,8 +152,8 @@ struct pkg_fts_item {
	char *fts_name;
	off_t fts_size;
	int fts_info;
-
	struct pkg_fts_item *next;
};
+
typedef tll(struct pkg_fts_item *) fts_item_t;

static struct pkg_fts_item*
pkg_create_repo_fts_new(FTSENT *fts, const char *root_path)
@@ -188,7 +187,7 @@ pkg_create_repo_fts_free(struct pkg_fts_item *item)
}

static int
-
pkg_create_repo_read_fts(struct pkg_fts_item **items, FTS *fts,
+
pkg_create_repo_read_fts(fts_item_t *items, FTS *fts,
	const char *repopath, size_t *plen, struct pkg_repo_meta *meta)
{
	FTSENT *fts_ent;
@@ -287,7 +286,7 @@ pkg_create_repo_read_fts(struct pkg_fts_item **items, FTS *fts,
		if (fts_cur == NULL)
			return (EPKG_FATAL);

-
		LL_PREPEND(*items, fts_cur);
+
		tll_push_front(*items, fts_cur);
		(*plen) ++;
	}

@@ -299,114 +298,50 @@ pkg_create_repo_read_fts(struct pkg_fts_item **items, FTS *fts,
	return (EPKG_OK);
}

-
static void
-
tell_parent(int fd, char *buf, size_t len)
-
{
-
	struct iovec iov[2];
-
	struct msghdr msg;
-

-
	iov[0].iov_base = buf;
-
	iov[0].iov_len = len;
-
	memset(&msg, 0, sizeof(msg));
-
	msg.msg_iov = iov;
-
	msg.msg_iovlen = 1;
-
	sendmsg(fd, &msg, MSG_EOR);
-
}
+
struct thr_env {
+
	int ntask;
+
	int ffd;
+
	int mfd;
+
	struct pkg_repo_meta *meta;
+
	fts_item_t fts_items;
+
	pthread_mutex_t nlock;
+
	pthread_mutex_t llock;
+
	pthread_cond_t cond;
+
};

-
static int
-
pkg_create_repo_worker(int mfd, int ffd, int pip,
-
	struct pkg_repo_meta *meta)
+
static void *
+
pkg_create_repo_thread(void *arg)
{
-
	pid_t pid;
-
	struct pollfd *pfd = NULL;
+
	struct thr_env *te = (struct thr_env *)arg;
	int flags, ret = EPKG_OK;
-
	size_t sz;
	struct pkg *pkg = NULL;
-
	char digestbuf[1024];
	xstring *b;
	struct iovec iov[2];
-
	uint32_t len;
-
	char buf[1024];
-
	char *w, *path;
-
	const char *rbuf, *c, *repopath;
+
	char *path;
+
	const char *repopath;
+
	struct pkg_fts_item *items = NULL;

	b = xstring_new();

-
	pid = fork();
-
	switch(pid) {
-
	case -1:
-
		pkg_emit_errno("pkg_create_repo_worker", "fork");
-
		xstring_free(b);
-
		return (EPKG_FATAL);
-
		break;
-
	case 0:
-
		break;
-
	default:
-
		/* Parent */
-
		xstring_free(b);
-
		return (EPKG_OK);
-
		break;
-
	}
-

	pkg_debug(1, "start worker to parse packages");

-
	if (ffd != -1)
+
	if (te->ffd != -1)
		flags = PKG_OPEN_MANIFEST_ONLY;
	else
		flags = PKG_OPEN_MANIFEST_ONLY | PKG_OPEN_MANIFEST_COMPACT;

-
	/* We are reading to digest buf but it's only to check the socketpair */
-
	if (read(pip, digestbuf, 1) == -1) {
-
		pkg_emit_errno("pkg_create_repo_worker", "read");
-
		goto cleanup;
-
	}
-

-
	pfd = xcalloc(1, sizeof(struct pollfd));
-
	pfd[0].fd = pip;
-
	pfd[0].events = POLLIN;
-

	for (;;) {
-
		w = buf;
-
		w = mp_encode_array(w, 1);
-
		w = mp_encode_uint(w, MSG_PKG_READY);
-
		tell_parent(pip, buf, w - buf);
-
		if (poll(pfd, 1, -1) == -1) {
-
			if (errno == EINTR)
-
				continue;
-
			else
-
				goto cleanup;
-
		}
-
		if (pfd[0].revents & (POLLIN|POLLHUP|POLLERR)) {
-
			for (;;) {
-
				ssize_t r;
-
				r = read(pfd[0].fd, buf, sizeof(buf));
-
				if (r == -1) {
-
					if (errno == EINTR)
-
						continue;
-
					else if (errno == EAGAIN || errno == EWOULDBLOCK) {
-
						return (EPKG_OK);
-
					}
-
					pkg_emit_errno("pkg_repo_worker", "read");
-
					return (EPKG_FATAL);
-
				} else if (r == 0)
-
					return (EPKG_END);
-
				else
-
					break;
-
			}
-
		} else {
-
			continue;
+
		if (items != NULL)
+
			pkg_create_repo_fts_free(items);
+
		pthread_mutex_lock(&te->llock);
+
		if (tll_length(te->fts_items) == 0) {
+
			pthread_mutex_unlock(&te->llock);
+
			goto cleanup;
		}
-
		rbuf = buf;
-
		sz = mp_decode_array(&rbuf);
-
		if (sz < 1)
-
			continue;
-
		c = mp_decode_str(&rbuf, &len);
-
		if (len == 0) /* empty package name means ends of repo */
-
			break;
-
		path = xstrndup(c, len);
-
		repopath = mp_decode_str(&rbuf, &len);
-
		if (len == 0) /* empty package name means ends of repo */
-
			break;
+
		items = tll_pop_front(te->fts_items);
+
		pthread_mutex_unlock(&te->llock);
+
		path = items->fts_accpath;
+
		repopath = items->pkg_path;
		if (pkg_open(&pkg, path, flags) == EPKG_OK) {
			size_t mlen;
			struct stat st;
@@ -414,8 +349,8 @@ pkg_create_repo_worker(int mfd, int ffd, int pip,
			pkg->sum = pkg_checksum_file(path, PKG_HASH_TYPE_SHA256_HEX);
			stat(path, &st);
			pkg->pkgsize = st.st_size;
-
			if (meta->hash) {
-
				ret = hash_file(meta, pkg, path);
+
			if (te->meta->hash) {
+
				ret = hash_file(te->meta, pkg, path);
				if (ret != EPKG_OK)
					goto cleanup;
			} else {
@@ -430,7 +365,7 @@ pkg_create_repo_worker(int mfd, int ffd, int pip,
			fflush(b->fp);
			mlen = strlen(b->buf);

-
			if (flock(mfd, LOCK_EX) == -1) {
+
			if (flock(te->mfd, LOCK_EX) == -1) {
				pkg_emit_errno("pkg_create_repo_worker", "flock");
				ret = EPKG_FATAL;
				goto cleanup;
@@ -441,103 +376,41 @@ pkg_create_repo_worker(int mfd, int ffd, int pip,
			iov[1].iov_base = (void *)"\n";
			iov[1].iov_len = 1;

-
			if (writev(mfd, iov, 2) == -1) {
+
			if (writev(te->mfd, iov, 2) == -1) {
				pkg_emit_errno("pkg_create_repo_worker", "write");
				ret = EPKG_FATAL;
-
				flock(mfd, LOCK_UN);
+
				flock(te->mfd, LOCK_UN);
				goto cleanup;
			}

-
			flock(mfd, LOCK_UN);
+
			flock(te->mfd, LOCK_UN);

-
			if (ffd != -1) {
+
			if (te->ffd != -1) {
				FILE *fl;

-
				if (flock(ffd, LOCK_EX) == -1) {
+
				if (flock(te->ffd, LOCK_EX) == -1) {
					pkg_emit_errno("pkg_create_repo_worker", "flock");
					ret = EPKG_FATAL;
					goto cleanup;
				}
-
				fl = fdopen(dup(ffd), "a");
+
				fl = fdopen(dup(te->ffd), "a");
				pkg_emit_filelist(pkg, fl);
				fclose(fl);

-
				flock(ffd, LOCK_UN);
+
				flock(te->ffd, LOCK_UN);
			}
-

-
			/* send a tick */
-
			w = buf;
-
			w = mp_encode_array(w, 1);
-
			w = mp_encode_uint(w, MSG_PKG_DONE);
-
			tell_parent(pip, buf, w - buf);
		}
-
		free(path);
+
		pthread_mutex_lock(&te->nlock);
+
		te->ntask++;
+
		pthread_cond_signal(&te->cond);
+
		pthread_mutex_unlock(&te->nlock);
	}

cleanup:
	xstring_free(b);
-
	close(pip);

	pkg_debug(1, "worker done");
-
	_exit(ret);
-
}
-

-
static int
-
pkg_create_repo_read_pipe(int fd, struct pkg_fts_item **items)
-
{
-
	char buf[1024];
-
	int r;
-
	size_t sz;
-
	uint64_t msgtype;
-
	const char *rbuf;
-

-
	for (;;) {
-
		r = read(fd, buf, sizeof(buf));
-
		if (r == -1) {
-
			if (errno == EINTR)
-
				continue;
-
			else if (errno == ECONNRESET) {
-
				/* Treat it as the end of a connection */
-
				return (EPKG_END);
-
			}
-
			else if (errno == EAGAIN || errno == EWOULDBLOCK) {
-
				return (EPKG_OK);
-
			}
-

-
			pkg_emit_errno("pkg_create_repo_read_pipe", "read");
-
			return (EPKG_FATAL);
-
		}
-
		else if (r == 0)
-
			return (EPKG_END);
-

-
		rbuf = buf;
-
		sz = mp_decode_array(&rbuf);
-
		if (sz < 1)
-
			continue;
-
		msgtype = mp_decode_uint(&rbuf);
-

-
		if (msgtype == MSG_PKG_DONE) {
-
			return (EPKG_OK);
-
		}
-

-
		if (msgtype == MSG_PKG_READY) {
-
			char *w;
-
			const char *str = (*items) != NULL ? (*items)->fts_accpath : "";
-
			const char *str2 = (*items) != NULL ? (*items)->pkg_path : "";
-
			w = buf;
-
			w = mp_encode_array(w, 2);
-
			w = mp_encode_str(w, str, strlen(str));
-
			w = mp_encode_str(w, str2, strlen(str2) + 1);
-
			if (*items != NULL)
-
				LL_DELETE((*items), (*items));
-
			tell_parent(fd, buf, w - buf);
-
		}
-
	}
-

-
	/*
-
	 * Never reached
-
	 */
-
	return (EPKG_OK);
+
	return (NULL);
}

#ifdef __linux__
@@ -562,11 +435,11 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
	const char *metafile, bool hash, bool hash_symlink)
{
	FTS *fts = NULL;
-
	struct pkg_fts_item *fts_items = NULL;
-
	int num_workers, i, remaining_workers;
-
	size_t len, ntask;
-
	struct pollfd *pfd = NULL;
-
	int cur_pipe[2], fd, outputdir_fd, mfd, ffd;
+
	int num_workers;
+
	pthread_t *threads;
+
	struct thr_env te = { 0 };
+
	size_t len;
+
	int fd, outputdir_fd;
	struct pkg_repo_meta *meta = NULL;
	int retcode = EPKG_FATAL;
	ucl_object_t *meta_dump;
@@ -575,7 +448,7 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
	char *repopath[2];
	char repodb[MAXPATHLEN];

-
	mfd = ffd = -1;
+
	te.mfd = te.ffd = -1;

	if (!is_dir(path)) {
		pkg_emit_error("%s is not a directory", path);
@@ -620,6 +493,8 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
	meta->hash = hash;
	meta->hash_symlink = hash_symlink;

+
	te.meta = meta;
+

	repopath[0] = path;
	repopath[1] = NULL;

@@ -635,12 +510,12 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
		goto cleanup;
	}

-
	if ((mfd = openat(outputdir_fd, meta->manifests,
+
	if ((te.mfd = openat(outputdir_fd, meta->manifests,
	     O_CREAT|O_TRUNC|O_WRONLY, 00644)) == -1) {
		goto cleanup;
	}
	if (filelist) {
-
		if ((ffd = openat(outputdir_fd, meta->filesite,
+
		if ((te.ffd = openat(outputdir_fd, meta->filesite,
		        O_CREAT|O_TRUNC|O_WRONLY, 00644)) == -1) {
			goto cleanup;
		}
@@ -648,7 +523,7 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,

	len = 0;

-
	pkg_create_repo_read_fts(&fts_items, fts, path, &len, meta);
+
	pkg_create_repo_read_fts(&te.fts_items, fts, path, &len, meta);

	if (len == 0) {
		/* Nothing to do */
@@ -662,89 +537,22 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
	/* Launch workers */
	pkg_emit_progress_start("Creating repository in %s", output_dir);

-
	pfd = xcalloc(num_workers, sizeof(struct pollfd));
-
	ntask = 0;
+
	threads = xcalloc(num_workers, sizeof(pthread_t));

	for (int i = 0; i < num_workers; i++) {
		/* Create new worker */
-
		int ofl;
-

-
		if (get_socketpair(cur_pipe) == -1) {
-
			pkg_emit_errno("pkg_create_repo", "pipe");
-
			goto cleanup;
-
		}
-

-
		if (pkg_create_repo_worker(mfd,
-
		    ffd, cur_pipe[1], meta) == EPKG_FATAL) {
-
			close(cur_pipe[0]);
-
			close(cur_pipe[1]);
-
			goto cleanup;
-
		}
-

-
		pfd[i].fd = cur_pipe[0];
-
		pfd[i].events = POLLIN;
-
		close(cur_pipe[1]);
-
		/* Make our end of the pipe non-blocking */
-
		ofl = fcntl(cur_pipe[0], F_GETFL, 0);
-
		fcntl(cur_pipe[0], F_SETFL, ofl | O_NONBLOCK);
-
	}
-

-
	/* Send start marker to all workers */
-
	for (i = 0; i < num_workers; i ++) {
-
		if (write(pfd[i].fd, ".", 1) == -1)
-
			pkg_emit_errno("pkg_create_repo", "write");
+
		pthread_create(&threads[i], NULL, &pkg_create_repo_thread, &te);
	}

-
	ntask = 0;
-
	remaining_workers = num_workers;
-
	while(remaining_workers > 0) {
-
		int st;
-

-
		pkg_debug(1, "checking for %d workers", remaining_workers);
-
		retcode = poll(pfd, num_workers, -1);
-
		if (retcode == -1) {
-
			if (errno == EINTR) {
-
				continue;
-
			}
-
			else {
-
				goto cleanup;
-
			}
-
		}
-
		else if (retcode > 0) {
-
			for (i = 0; i < num_workers; i ++) {
-
				if (pfd[i].fd != -1 &&
-
								(pfd[i].revents & (POLLIN|POLLHUP|POLLERR))) {
-
					if (pkg_create_repo_read_pipe(pfd[i].fd, &fts_items) != EPKG_OK) {
-
						/*
-
						 * Wait for the worker finished
-
						 */
-
						while (wait(&st) == -1) {
-
							if (errno == EINTR)
-
								continue;
-

-
							pkg_emit_errno("pkg_create_repo", "wait");
-
							break;
-
						}
-

-
						remaining_workers --;
-
						pkg_debug(1, "finished worker, %d remaining",
-
							remaining_workers);
-
						pfd[i].events = 0;
-
						pfd[i].revents = 0;
-
						close(pfd[i].fd);
-
						pfd[i].fd = -1;
-
					} else {
-
						if (errno == EAGAIN || errno == EWOULDBLOCK) {
-
							errno = 0;
-
							continue;
-
						}
-
						pkg_emit_progress_tick(ntask++, len);
-
					}
-
				}
-
			}
-
		}
+
	pthread_mutex_lock(&te.nlock);
+
	while (te.ntask < len) {
+
		pthread_cond_wait(&te.cond, &te.nlock);
+
		pkg_emit_progress_tick(te.ntask, len);
	}
+
	pthread_mutex_unlock(&te.nlock);

+
	for (int i = 0; i < num_workers; i++)
+
		pthread_join(threads[i], NULL);
	pkg_emit_progress_tick(len, len);

	/* Write metafile */
@@ -770,16 +578,15 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
cleanup:
	if (outputdir_fd != -1)
		close(outputdir_fd);
-
	if (mfd != -1)
-
		close(mfd);
-
	if (ffd != -1)
-
		close(ffd);
-
	if (pfd != NULL)
-
		free(pfd);
+
	if (te.mfd != -1)
+
		close(te.mfd);
+
	if (te.ffd != -1)
+
		close(te.ffd);
	if (fts != NULL)
		fts_close(fts);

-
	LL_FREE(fts_items, pkg_create_repo_fts_free);
+
	tll_free_and_free(te.fts_items, pkg_create_repo_fts_free);
+
	//LL_FREE(te.fts_items, pkg_create_repo_fts_free);

	pkg_repo_meta_free(meta);