Radish alpha
H
rad:z3QDZAW2FAfuLvihrhiyDC9fAD8G9
HardenedBSD Package Manager
Radicle
Git
Fix pkg ssh by creating a fetch(3) like api
Baptiste Daroussin committed 12 years ago
commit 6b71cee9660a1d56288c2a12d3633730e6d76687
parent f2bc50e
4 files changed +259 -96
modified libpkg/fetch.c
@@ -26,9 +26,8 @@
 */

#include <sys/param.h>
-
#include <sys/stat.h>
-
#include <sys/event.h>
-
#include <sys/time.h>
+
#include <sys/wait.h>
+
#include <sys/socket.h>

#include <ctype.h>
#include <fcntl.h>
@@ -39,7 +38,7 @@
#include <time.h>
#include <unistd.h>
#include <fetch.h>
-
#include <pthread.h>
+
#include <paths.h>

#include "pkg.h"
#include "private/event.h"
@@ -119,6 +118,146 @@ pkg_fetch_file(struct pkg_repo *repo, const char *url, const char *dest, time_t
}

static int
+
ssh_cache_data(struct pkg_repo *repo, char *src, size_t nbytes)
+
{
+
	char *tmp;
+

+
	if (repo->sshio.cache.size < nbytes) {
+
		tmp = realloc(repo->sshio.cache.buf, nbytes);
+
		if (tmp == NULL)
+
			return (-1);
+

+
		repo->sshio.cache.buf = tmp;
+
		repo->sshio.cache.size = nbytes;
+
	}
+
	memcpy(repo->sshio.cache.buf, src, nbytes);
+
	repo->sshio.cache.len = nbytes;
+
	repo->sshio.cache.pos = 0;
+

+
	return (0);
+
}
+

+
static int
+
ssh_read(void *data, char *buf, int len)
+
{
+
	struct pkg_repo *repo = (struct pkg_repo *) data;
+
	struct timeval now, timeout, delta;
+
	fd_set readfds;
+
	ssize_t rlen, total;
+
	char *start;
+

+
	pkg_debug(2, "ssh: start reading");
+

+
	if (fetchTimeout > 0) {
+
		gettimeofday(&timeout, NULL);
+
		timeout.tv_sec += fetchTimeout;
+
	}
+

+
	total = 0;
+
	start = buf;
+

+
	if (repo->sshio.cache.len > 0) {
+
		/*
+
		 * The last invocation of fetch_read was interrupted by a
+
		 * signal after some data had been read from the socket. Copy
+
		 * the cached data into the supplied buffer before trying to
+
		 * read from the socket again.
+
		 */
+
		total = (repo->sshio.cache.len < (size_t)len) ? repo->sshio.cache.len : len;
+
		memcpy(buf, repo->sshio.cache.buf, total);
+

+
		repo->sshio.cache.len -= total;
+
		repo->sshio.cache.pos += total;
+
		len -= total;
+
		buf += total;
+
	}
+

+

+
	while (len > 0) {
+
		if (repo->tofetch > 0 && repo->tofetch == repo->fetched)
+
			break;
+

+
		rlen = read(repo->sshio.in, buf, len);
+
		if (rlen == 0) {
+
			break;
+
		} else if (rlen > 0) {
+
			len -= rlen;
+
			buf += rlen;
+
			if (repo->tofetch > 0)
+
				repo->fetched += rlen;
+
			total += rlen;
+
			continue;
+
		} else if (rlen == -1) {
+
			if (errno == EINTR)
+
				ssh_cache_data(repo, start, total);
+
			if (errno != EAGAIN) {
+
				pkg_emit_errno("timeout", "ssh");
+
				return (-1);
+
			}
+
			if (errno == EAGAIN && total > 0) {
+
				break;
+
			}
+
		}
+

+
		FD_ZERO(&readfds);
+
		while (!FD_ISSET(repo->sshio.in, &readfds)) {
+
			FD_SET(repo->sshio.in, &readfds);
+
			if (fetchTimeout > 0) {
+
				gettimeofday(&now, NULL);
+
				if (!timercmp(&timeout, &now, >)) {
+
					errno = ETIMEDOUT;
+
					return (-1);
+
				}
+
				timersub(&timeout, &now, &delta);
+
			}
+
			errno = 0;
+
			if (select(repo->sshio.in + 1, &readfds, NULL, NULL,
+
			    fetchTimeout > 0 ? &delta : NULL) < 0) {
+
				if (errno == EINTR) {
+
					/* Save anything that was read. */
+
					ssh_cache_data(repo, start, total);
+
					continue;
+
				}
+
				return (-1);
+
			}
+
		}
+
	}
+

+
	pkg_debug(2, "ssh: have read %d bytes", total);
+
	return (total);
+
}
+

+
static int
+
ssh_write(void *data, const char *buf, int l)
+
{
+
	struct pkg_repo *repo = (struct pkg_repo *)data;
+

+
	return (write(repo->sshio.out, buf, l));
+
}
+

+
static int
+
ssh_close(void *data)
+
{
+
	struct pkg_repo *repo = (struct pkg_repo *)data;
+
	int pstat;
+

+
	free(repo->sshio.cache.buf);
+

+
	write(repo->sshio.out, "quit\n", 5);
+

+
	while (waitpid(repo->sshio.pid, &pstat, 0) == -1) {
+
		if (errno != EINTR)
+
			return (EPKG_FATAL);
+
	}
+

+
	repo->ssh = NULL;
+
	repo->tofetch = 0;
+
	repo->fetched = 0;
+

+
	return (WEXITSTATUS(pstat));
+
}
+

+
static int
start_ssh(struct pkg_repo *repo, struct url *u, off_t *sz)
{
	char *line = NULL;
@@ -127,42 +266,87 @@ start_ssh(struct pkg_repo *repo, struct url *u, off_t *sz)
	struct sbuf *cmd = NULL;
	const char *errstr;
	const char *ssh_args;
+
	int sshin[2];
+
	int sshout[2];
+
	const char *argv[4];

	pkg_config_string(PKG_CONFIG_SSH_ARGS, &ssh_args);

	if (repo->ssh == NULL) {
-
		cmd = sbuf_new_auto();
-
		sbuf_cat(cmd, "/usr/bin/ssh -e none -T ");
-
		if (ssh_args != NULL)
-
			sbuf_printf(cmd, "%s ", ssh_args);
-
		if (u->port > 0)
-
			sbuf_printf(cmd, "-P %d ", u->port);
-
		if (u->user[0] != '\0')
-
			sbuf_printf(cmd, "%s@", u->user);
-
		sbuf_cat(cmd, u->host);
-
		sbuf_printf(cmd, " pkg ssh");
-
		sbuf_finish(cmd);
-
		pkg_debug(1, "Fetch: running '%s'", sbuf_data(cmd));
-
		if ((repo->ssh = popen(sbuf_data(cmd), "r+")) == NULL) {
-
			pkg_emit_errno("popen", "ssh");
-
			sbuf_delete(cmd);
+
		/* Use socket pair because pipe have blocking issues */
+
		if (socketpair(AF_UNIX, SOCK_STREAM, 0, sshin) <0 ||
+
		    socketpair(AF_UNIX, SOCK_STREAM, 0, sshout) < 0)
+
			return(EPKG_FATAL);
+

+
		set_nonblocking(sshout[0]);
+
		set_nonblocking(sshout[1]);
+
		set_nonblocking(sshin[0]);
+
		set_nonblocking(sshin[1]);
+

+
		repo->sshio.pid = vfork();
+
		if (repo->sshio.pid == -1) {
+
			pkg_emit_errno("Cannot fork", "start_ssh");
			return (EPKG_FATAL);
		}
-
		sbuf_delete(cmd);
+

+
		if (repo->sshio.pid == 0) {
+
			if (dup2(sshin[0], STDIN_FILENO) < 0 ||
+
			    close(sshin[1]) < 0 ||
+
			    close(sshout[0]) < 0 ||
+
			    dup2(sshout[1], STDOUT_FILENO) < 0) {
+
				pkg_emit_errno("Cannot prepare pipes", "start_ssh");
+
				return (EPKG_FATAL);
+
			}
+

+
			cmd = sbuf_new_auto();
+
			sbuf_cat(cmd, "/usr/bin/ssh -e none -T ");
+
			if (ssh_args != NULL)
+
				sbuf_printf(cmd, "%s ", ssh_args);
+
			if (u->port > 0)
+
				sbuf_printf(cmd, "-P %d ", u->port);
+
			if (u->user[0] != '\0')
+
				sbuf_printf(cmd, "%s@", u->user);
+
			sbuf_cat(cmd, u->host);
+
			sbuf_printf(cmd, " pkg ssh");
+
			sbuf_finish(cmd);
+
			pkg_debug(1, "Fetch: running '%s'", sbuf_data(cmd));
+
			argv[0] = _PATH_BSHELL;
+
			argv[1] = "-c";
+
			argv[2] = sbuf_data(cmd);
+
			argv[3] = NULL;
+

+
			if (sshin[0] != STDIN_FILENO)
+
				close(sshin[0]);
+
			if (sshout[1] != STDOUT_FILENO)
+
				close(sshout[1]);
+
			execvp(argv[0], __DECONST(char **, argv));
+
			/* NOT REACHED */
+
		}
+

+
		if (close(sshout[1]) < 0 || close(sshin[0]) < 0) {
+
			pkg_emit_errno("Failed to close pipes", "start_ssh");
+
			return (EPKG_FATAL);
+
		}
+

+
		repo->sshio.in = sshout[0];
+
		repo->sshio.out = sshin[1];
+
		set_nonblocking(repo->sshio.in);
+

+
		repo->ssh = funopen(repo, ssh_read, ssh_write, NULL, ssh_close);

		if (getline(&line, &linecap, repo->ssh) > 0) {
			if (strncmp(line, "ok:", 3) != 0) {
-
				pclose(repo->ssh);
+
				fclose(repo->ssh);
				free(line);
				return (EPKG_FATAL);
			}
		} else {
-
			pclose(repo->ssh);
+
			fclose(repo->ssh);
			return (EPKG_FATAL);
		}
	}
-

	fprintf(repo->ssh, "get %s %" PRIdMAX "\n", u->doc, (intmax_t)u->ims_time);
+
	repo->tofetch = 0;
	if ((linelen = getline(&line, &linecap, repo->ssh)) > 0) {
		if (line[linelen -1 ] == '\n')
			line[linelen -1 ] = '\0';
@@ -181,7 +365,9 @@ start_ssh(struct pkg_repo *repo, struct url *u, off_t *sz)
			free(line);
			return (EPKG_OK);
		}
+

	}
+

	free(line);
	return (EPKG_FATAL);
}
@@ -199,7 +385,6 @@ pkg_fetch_file_to_fd(struct pkg_repo *repo, const char *url, int dest, time_t *t

	int64_t		 max_retry, retry;
	int64_t		 fetch_timeout;
-
	int		 tmout;
	time_t		 begin_dl;
	time_t		 now;
	time_t		 last = 0;
@@ -211,9 +396,6 @@ pkg_fetch_file_to_fd(struct pkg_repo *repo, const char *url, int dest, time_t *t
	struct dns_srvinfo	*srv_current = NULL;
	struct http_mirror	*http_current = NULL;
	off_t		 sz = 0;
-
	int		 kq = -1, flags = 0;
-
	struct kevent	 e, ev;
-
	struct timespec	 ts;
	bool		 pkg_url_scheme = false;

	if (pkg_config_int64(PKG_CONFIG_FETCH_RETRY, &max_retry) == EPKG_FATAL)
@@ -262,28 +444,6 @@ pkg_fetch_file_to_fd(struct pkg_repo *repo, const char *url, int dest, time_t *t
		if ((retcode = start_ssh(repo, u, &sz)) != EPKG_OK)
			goto cleanup;
		remote = repo->ssh;
-
		kq = kqueue();
-
		if (kq == -1) {
-
			pkg_emit_errno("kqueue", "ssh");
-
			retcode = EPKG_FATAL;
-
			goto cleanup;
-
		}
-
		EV_SET(&e, fileno(repo->ssh), EVFILT_READ, EV_ADD, 0, 0, 0);
-
		if (kevent(kq, &e, 1, NULL, 0, NULL) == -1) {
-
			pkg_emit_errno("kevent", "ssh");
-
			retcode = EPKG_FATAL;
-
			goto cleanup;
-
		}
-
		if ((flags = fcntl(fileno(repo->ssh), F_GETFL)) == -1) {
-
			pkg_emit_errno("fcntl", "set ssh non-blocking");
-
			retcode = EPKG_FATAL;
-
			goto cleanup;
-
		}
-
		if (fcntl(fileno(repo->ssh), F_SETFL, flags | O_NONBLOCK) == -1) {
-
			pkg_emit_errno("fcntl", "set ssh non-blocking");
-
			retcode = EPKG_FATAL;
-
			goto cleanup;
-
		}
	}

	doc = u->doc;
@@ -357,6 +517,7 @@ pkg_fetch_file_to_fd(struct pkg_repo *repo, const char *url, int dest, time_t *t
			}
		}
	}
+

	if (strcmp(u->scheme, "ssh") != 0) {
		if (t != NULL && st.mtime != 0) {
			if (st.mtime < *t) {
@@ -369,37 +530,11 @@ pkg_fetch_file_to_fd(struct pkg_repo *repo, const char *url, int dest, time_t *t
	}

	now = begin_dl = time(NULL);
-
	tmout = fetch_timeout;
+
	repo->tofetch = sz;
+
	repo->fetched = 0;
	while (done < sz) {
-
		if (kq == -1) {
-
			if ((r = fread(buf, 1, sizeof(buf), remote)) < 1)
-
				break;
-
		} else {
-
			ts.tv_sec = tmout;
-
			ts.tv_nsec = 0;
-
			if (kevent(kq, &e, 1, &ev, 1, &ts) == -1) {
-
				if (time(NULL) - now > fetch_timeout) {
-
					pkg_emit_error("Fetch timeout");
-
					retcode = EPKG_FATAL;
-
					goto cleanup;
-
				}
-
				if (errno == EINTR) {
-
					tmout = fetch_timeout - (time(NULL) - now);
-
					continue;
-
				}
-
				pkg_emit_errno("kevent", "ssh");
-
				retcode = EPKG_FATAL;
-
				goto cleanup;
-
			}
-
			if (ev.data == 0)
-
				break;
-
			size_t size = (size_t)ev.data;
-
			if (size > sizeof(buf))
-
				size = sizeof(buf);
-
			if ((r = fread(buf, 1, size, remote)) < 1)
-
				break;
-
			tmout = fetchTimeout;
-
		}
+
		if ((r = fread(buf, 1, sizeof(buf), remote)) < 1)
+
			break;

		if (write(dest, buf, r) != r) {
			pkg_emit_errno("write", "");
@@ -415,6 +550,7 @@ pkg_fetch_file_to_fd(struct pkg_repo *repo, const char *url, int dest, time_t *t
			last = now;
		}
	}
+
	repo->tofetch = 0;

	if (done < sz) {
		pkg_emit_error("An error occurred while fetching package");
@@ -430,26 +566,11 @@ pkg_fetch_file_to_fd(struct pkg_repo *repo, const char *url, int dest, time_t *t

	cleanup:

-
	if (u != NULL && strcmp(u->scheme, "ssh") != 0) {
-
		if (remote != NULL)
+
	if (u != NULL) {
+
		if (remote != NULL && remote != repo->ssh)
			fclose(remote);
-
	} else {
-
		EV_SET(&e, fileno(repo->ssh), EVFILT_READ, EV_DELETE, 0, 0, 0);
-
		kevent(kq, &e, 1, NULL, 0, NULL);
-
		flags &= ~O_NONBLOCK;
-
		if (fcntl(fileno(repo->ssh), F_SETFL, flags) == -1)
-
			flags = -1;
-

-
		/* if something went wrong close the ssh connection */
-
		if (flags == -1) {
-
			pclose(repo->ssh);
-
			repo->ssh = NULL;
-
		}
	}

-
	if (kq != -1)
-
		close(kq);
-

	/* restore original doc */
	u->doc = doc;

modified libpkg/private/pkg.h
@@ -272,6 +272,20 @@ struct pkg_repo {
	signature_t signature_type;
	char *fingerprints;
	FILE *ssh;
+

+
	struct {
+
		int in;
+
		int out;
+
		pid_t pid;
+
		struct {
+
			char *buf;
+
			size_t size;
+
			size_t pos;
+
			size_t len;
+
		} cache;
+
	} sshio;
+
	size_t fetched;
+
	size_t tofetch;
	bool enable;
	UT_hash_handle hh;
};
modified libpkg/private/utils.h
@@ -107,6 +107,8 @@ struct dns_srvinfo *

int set_nameserver(const char *nsname);
ucl_object_t *yaml_to_ucl(const char *file, const char *buffer, size_t len);
+
void set_blocking(int fd);
+
void set_nonblocking(int fd);


#endif
modified libpkg/utils.c
@@ -598,3 +598,29 @@ yaml_to_ucl(const char *file, const char *buffer, size_t len) {

	return (obj);
}
+

+
void
+
set_nonblocking(int fd)
+
{
+
	int flags;
+

+
	if ((flags = fcntl(fd, F_GETFL)) == -1)
+
		return;
+
	if (!(flags & O_NONBLOCK)) {
+
		flags |= O_NONBLOCK;
+
		fcntl(fd, F_SETFL, flags);
+
	}
+
}
+

+
void
+
set_blocking(int fd)
+
{
+
	int flags;
+

+
	if ((flags = fcntl(fd, F_GETFL)) == -1)
+
		return;
+
	if (flags & O_NONBLOCK) {
+
		flags &= ~O_NONBLOCK;
+
		fcntl(fd, F_SETFL, flags);
+
	}
+
}