Radish alpha
H
rad:z3QDZAW2FAfuLvihrhiyDC9fAD8G9
HardenedBSD Package Manager
Radicle
Git
Use msgpack to format ipc calls
Baptiste Daroussin committed 6 years ago
commit 371fdeea583fc49c957bebecfac3ec9ea1b3a3de
parent 4cada7c
1 file changed +73 -99
modified libpkg/pkg_repo_create.c
@@ -52,6 +52,7 @@
#include <math.h>
#include <poll.h>
#include <sys/uio.h>
+
#include <msgpuck.h>

#include "pkg.h"
#include "private/event.h"
@@ -59,6 +60,10 @@
#include "private/pkg.h"
#include "private/pkgdb.h"

+
enum {
+
	MSG_PKG_DONE=0,
+
	MSG_DIGEST,
+
};

struct digest_list_entry {
	char *origin;
@@ -204,6 +209,20 @@ pkg_create_repo_read_fts(struct pkg_fts_item **items, FTS *fts,
	return (EPKG_OK);
}

+
static void
+
tell_parent(int fd, char *buf, size_t len)
+
{
+
	struct iovec iov[2];
+
	struct msghdr msg;
+

+
	iov[0].iov_base = buf;
+
	iov[0].iov_len = len;
+
	memset(&msg, 0, sizeof(msg));
+
	msg.msg_iov = iov;
+
	msg.msg_iovlen = 1;
+
	sendmsg(fd, &msg, MSG_EOR);
+
}
+

static int
pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
	int mfd, int ffd, int pip,
@@ -217,9 +236,10 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
	struct pkg_manifest_key *keys = NULL;
	char *mdigest = NULL;
	char digestbuf[1024];
-
	struct iovec iov[2];
-
	struct msghdr msg;
	UT_string *b;
+
	struct iovec iov[2];
+
	char buf[1024];
+
	char *w;

	utstring_new(b);

@@ -258,7 +278,6 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
			break;

		if (pkg_open(&pkg, cur->fts_accpath, keys, flags) == EPKG_OK) {
-
			int r;
			off_t mpos, fpos = 0;
			size_t mlen;

@@ -326,32 +345,29 @@ pkg_create_repo_worker(struct pkg_fts_item *start, size_t nelts,
			}

			if (meta->version == 1) {
-
				r = snprintf(digestbuf, sizeof(digestbuf), "%s:%s:%ld:%ld:%ld:%s\n",
-
				    pkg->origin,
-
				    mdigest,
-
				    (long)mpos,
-
				    (long)fpos,
-
				    (long)mlen,
-
				    pkg->sum);
-

-
				free(mdigest);
-
				mdigest = NULL;
-
				iov[0].iov_base = digestbuf;
-
				iov[0].iov_len = r;
-
				memset(&msg, 0, sizeof(msg));
-
				msg.msg_iov = iov;
-
				msg.msg_iovlen = 1;
-
				sendmsg(pip, &msg, MSG_EOR);
+
				w = buf;
+
				w = mp_encode_array(w, 7);
+
				w = mp_encode_uint(w, MSG_DIGEST);
+
				w = mp_encode_str(w, pkg->origin, strlen(pkg->origin));
+
				w = mp_encode_str(w, mdigest, strlen(mdigest));
+
				w = mp_encode_uint(w, mpos);
+
				w = mp_encode_uint(w, fpos);
+
				w = mp_encode_uint(w, mlen);
+
				w = mp_encode_str(w, pkg->sum, strlen(pkg->sum));
+
				tell_parent(pip, buf, w - buf);
			}
+
			/* send a tick */
+
			w = buf;
+
			w = mp_encode_array(w, 1);
+
			w = mp_encode_uint(w, MSG_PKG_DONE);
+
			tell_parent(pip, buf, w - buf);
		}
		cur_job ++;
	}

cleanup:
	pkg_manifest_keys_free(keys);
-

	utstring_free(b);
-
	write(pip, ".\n", 2);
	close(pip);
	free(mdigest);

@@ -360,19 +376,15 @@ cleanup:
}

static int
-
pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist, struct pkg_repo_meta *meta)
+
pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist)
{
	struct digest_list_entry *dig = NULL;
	char buf[1024];
-
	int r, i, start;
-
	enum {
-
		s_set_origin = 0,
-
		s_set_digest,
-
		s_set_mpos,
-
		s_set_fpos,
-
		s_set_mlen,
-
		s_set_checksum
-
	} state = 0;
+
	int r;
+
	size_t sz;
+
	uint32_t len;
+
	uint64_t msgtype;
+
	const char *rbuf;

	for (;;) {
		dig = NULL;
@@ -385,8 +397,9 @@ pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist, struct pkg_r
				/* Treat it as the end of a connection */
				return (EPKG_END);
			}
-
			else if (errno == EAGAIN || errno == EWOULDBLOCK)
+
			else if (errno == EAGAIN || errno == EWOULDBLOCK) {
				return (EPKG_OK);
+
			}

			pkg_emit_errno("pkg_create_repo_read_pipe", "read");
			return (EPKG_FATAL);
@@ -394,71 +407,29 @@ pkg_create_repo_read_pipe(int fd, struct digest_list_entry **dlist, struct pkg_r
		else if (r == 0)
			return (EPKG_END);

-
		/* 
-
		 * Don't bother adding to the digest list if we 
-
		 * aren't creating a repo v1
-
		 */
-
		if (meta->version != 1)
+
		rbuf = buf;
+
		sz = mp_decode_array(&rbuf);
+
		if (sz < 1)
			continue;
+
		msgtype = mp_decode_uint(&rbuf);

-
		/*
-
		 * XXX: can parse merely full lines
-
		 */
-
		start = 0;
-
		for (i = 0; i < r; i ++) {
-
			if (buf[i] == ':') {
-
				switch(state) {
-
				case s_set_origin:
-
					dig = xcalloc(1, sizeof(*dig));
-
					dig->origin = xmalloc(i - start + 1);
-
					strlcpy(dig->origin, &buf[start], i - start + 1);
-
					state = s_set_digest;
-
					break;
-
				case s_set_digest:
-
					dig->digest = xmalloc(i - start + 1);
-
					strlcpy(dig->digest, &buf[start], i - start + 1);
-
					state = s_set_mpos;
-
					break;
-
				case s_set_mpos:
-
					dig->manifest_pos = strtol(&buf[start], NULL, 10);
-
					state = s_set_fpos;
-
					break;
-
				case s_set_fpos:
-
					dig->files_pos = strtol(&buf[start], NULL, 10);
-
					state = s_set_mlen;
-
					break;
-
				case s_set_mlen:
-
					dig->manifest_length = strtol(&buf[start], NULL, 10);
-
					state = s_set_checksum;
-
					break;
-
				case s_set_checksum:
-
					dig->checksum =  xmalloc(i - start + 1);
-
					strlcpy(dig->digest, &buf[start], i - start + 1);
-
					state = s_set_origin;
-
					break;
-
				}
-
				start = i + 1;
-
			}
-
			else if (buf[i] == '\n') {
-
				if (state == s_set_mlen) {
-
					dig->manifest_length = strtol(&buf[start], NULL, 10);
-
				}
-
				else if (state == s_set_checksum) {
-
					dig->checksum =  xmalloc(i - start + 1);
-
					strlcpy(dig->checksum, &buf[start], i - start + 1);
-
				}
-
				assert(dig->origin != NULL);
-
				assert(dig->digest != NULL);
-
				DL_APPEND(*dlist, dig);
-
				state = s_set_origin;
-
				break;
-
			}
-
			else if (buf[i] == '.' && buf[i + 1] == '\n') {
-
				if (dig != NULL)
-
					free(dig->origin);
-
				free(dig);
-
				return (EPKG_END);
-
			}
+
		if (msgtype == MSG_PKG_DONE) {
+
			return (EPKG_OK);
+
		}
+

+
		if (msgtype == MSG_DIGEST) {
+
			const char *c;
+
			dig = xcalloc(1, sizeof(*dig));
+
			c = mp_decode_str(&rbuf, &len);
+
			dig->origin = xstrndup(c, len);
+
			c = mp_decode_str(&rbuf, &len);
+
			dig->digest = xstrndup(c, len);
+
			dig->manifest_pos = mp_decode_uint(&rbuf);
+
			dig->files_pos = mp_decode_uint(&rbuf);
+
			dig->manifest_length = mp_decode_uint(&rbuf);
+
			c = mp_decode_str(&rbuf, &len);
+
			dig->checksum = xstrndup(c, len);
+
			DL_APPEND(*dlist, dig);
		}
	}

@@ -653,7 +624,7 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
	ntask = 0;
	remaining_workers = num_workers;
	while(remaining_workers > 0) {
-
		int st;
+
		int st, r;

		pkg_debug(1, "checking for %d workers", remaining_workers);
		retcode = poll(pfd, num_workers, -1);
@@ -669,7 +640,7 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
			for (i = 0; i < num_workers; i ++) {
				if (pfd[i].fd != -1 &&
								(pfd[i].revents & (POLLIN|POLLHUP|POLLERR))) {
-
					if (pkg_create_repo_read_pipe(pfd[i].fd, &dlist, meta) != EPKG_OK) {
+
					if ((r = pkg_create_repo_read_pipe(pfd[i].fd, &dlist)) != EPKG_OK) {
						/*
						 * Wait for the worker finished
						 */
@@ -689,8 +660,11 @@ pkg_create_repo(char *path, const char *output_dir, bool filelist,
						pfd[i].revents = 0;
						close(pfd[i].fd);
						pfd[i].fd = -1;
-
					}
-
					else {
+
					} else {
+
						if (errno == EAGAIN || errno == EWOULDBLOCK) {
+
							errno = 0;
+
							continue;
+
						}
						pkg_emit_progress_tick(ntask++, len);
					}
				}