Radish alpha
H
rad:z3QDZAW2FAfuLvihrhiyDC9fAD8G9
HardenedBSD Package Manager
Radicle
Git
Migrate from pipes to sockets.
Vsevolod Stakhov committed 11 years ago
commit 486306aa988aa43ff4ae7d57e3e72b3c128af6ff
parent a4f90df
2 files changed +50 -8
modified configure.ac
@@ -136,6 +136,18 @@ AC_SUBST([GIT_HEAD])
AC_DEFINE_UNQUOTED([GIT_HEAD], $GIT_HEAD)
AC_DEFINE([PKG_API], [api_ver])

+
AC_RUN_IFELSE(
+
  [AC_LANG_SOURCE(
+
    [[
+
    #include <sys/types.h>
+
    #include <sys/socket.h>
+
    int main() { return socket(AF_LOCAL, SOCK_SEQPACKET, 0) == -1 ? -1 : 0; }
+
    ]]
+
  )],
+
  [AC_DEFINE([HAVE_SEQPACKET], 1, [Define to 1 if you have SOCK_SEQPACKET working])]
+
)
+

+
AC_SUBST([HAVE_SEQPACKET])

AC_SEARCH_LIBS([lzma_version_string], [lzma], [], [
  AC_MSG_ERROR([unable to find the liblzma])
modified libpkg/pkg_repo_create.c
@@ -33,6 +33,7 @@
#include <sys/stat.h>
#include <sys/sysctl.h>
#include <sys/wait.h>
+
#include <sys/socket.h>

#include <archive_entry.h>
#include <assert.h>
@@ -56,6 +57,7 @@
#include "private/utils.h"
#include "private/pkg.h"
#include "private/pkgdb.h"
+
#include "pkg_config.h"

struct digest_list_entry {
	char *origin;
@@ -263,6 +265,7 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
	char checksum[SHA256_DIGEST_LENGTH * 3 + 1], *mdigest = NULL;
	char digestbuf[1024];
	struct iovec iov[2];
+
	struct msghdr msg;

	struct sbuf *b = sbuf_new_auto();

@@ -389,7 +392,12 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,

			free(mdigest);
			mdigest = NULL;
-
			write(pip, digestbuf, r);
+
			iov[0].iov_base = digestbuf;
+
			iov[0].iov_len = r;
+
			memset(&msg, 0, sizeof(msg));
+
			msg.msg_iov = iov;
+
			msg.msg_iovlen = 1;
+
			sendmsg(pip, &msg, MSG_EOR);
		}
		cur_job ++;
	}
@@ -397,6 +405,7 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
cleanup:
	pkg_manifest_keys_free(keys);

+
	write(pip, ".\n", 2);
	close(pip);
	close(mfd);
	if (read_files)
@@ -427,6 +436,10 @@ pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist)
		if (r == -1) {
			if (errno == EINTR)
				continue;
+
			else if (errno == ECONNRESET) {
+
				/* Treat it as the end of a connection */
+
				return (EPKG_END);
+
			}

			pkg_emit_errno("pkg_create_repo_read_pipe", "read");
			return (EPKG_FATAL);
@@ -445,7 +458,7 @@ pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist)
		if (buf[i] == ':') {
			switch(state) {
			case s_set_origin:
-
				dig = malloc(sizeof(*dig));
+
				dig = calloc(1, sizeof(*dig));
				dig->origin = malloc(i - start + 1);
				strlcpy(dig->origin, &buf[start], i - start + 1);
				state = s_set_digest;
@@ -473,11 +486,16 @@ pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist)
		}
		else if (buf[i] == '\n') {
			dig->manifest_length = strtol(&buf[start], NULL, 10);
+
			assert(dig->origin != NULL);
+
			assert(dig->digest != NULL);
			DL_APPEND(*dlist, dig);
			state = s_set_origin;
			start = i + 1;
			break;
		}
+
		else if (buf[i] == '.' && buf[i + 1] == '\n') {
+
			return (EPKG_END);
+
		}
	}

	return (EPKG_OK);
@@ -492,7 +510,7 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,

	struct pkg_conflict *c, *ctmp;
	struct pkg_conflict_bulk *conflicts = NULL, *curcb, *tmpcb;
-
	int num_workers, i;
+
	int num_workers, i, remaining_workers;
	size_t len, tasks_per_worker, ntask;
	struct digest_list_entry *dlist = NULL, *cur_dig, *dtmp;
	struct pollfd *pfd = NULL;
@@ -597,8 +615,14 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
		if (ntask % tasks_per_worker == 0) {
			/* Create new worker */
			int nworker = ntask / tasks_per_worker;
+
			int ofl;
+
			int st = SOCK_DGRAM;

-
			if (pipe(cur_pipe) == -1) {
+
#ifdef HAVE_SEQPACKET
+
			st = SOCK_SEQPACKET;
+
#endif
+

+
			if (socketpair(AF_UNIX, st, 0, cur_pipe) == -1) {
				pkg_emit_errno("pkg_create_repo", "pipe");
				retcode = EPKG_FATAL;
				goto cleanup;
@@ -616,12 +640,18 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
			pfd[nworker].fd = cur_pipe[0];
			pfd[nworker].events = POLL_IN;
			close(cur_pipe[1]);
+
			/* Make our end of the pipe non-blocking */
+
			ofl = fcntl(cur_pipe[0], F_GETFL, 0);
+
			fcntl(cur_pipe[0], F_SETFL, ofl | O_NONBLOCK);
		}
		ntask ++;
	}

	ntask = 0;
-
	while(num_workers > 0) {
+
	remaining_workers = num_workers;
+
	while(remaining_workers > 0) {
+
		int st;
+

		retcode = poll(pfd, num_workers, -1);
		if (retcode == -1) {
			if (errno == EINTR) {
@@ -634,12 +664,11 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
		}
		else if (retcode > 0) {
			for (i = 0; i < num_workers; i ++) {
-
				if (pfd[i].revents & POLL_IN) {
+
				if (pfd[i].revents & (POLL_IN|POLL_HUP|POLL_ERR)) {
					if (pkg_create_repo_read_pipe(pfd[i].fd, &dlist) != EPKG_OK) {
						/*
						 * Wait for the worker finished
						 */
-
						int st;

						while (wait(&st) == -1) {
							if (errno == EINTR)
@@ -649,7 +678,8 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
							break;
						}

-
						num_workers --;
+
						remaining_workers --;
+
						pfd[i].events = 0;
					}
					else {
						pkg_emit_progress_tick(ntask++, len);