X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=http_recv.c;h=c418af129a0560fdb09e2aa2cf32e1c7b68146ed;hp=da28faaaecd2bb19948d96a3606310c3cb9fbb84;hb=e56519a6ef7f28ce4974fc83400418e8dcb47f33;hpb=f4fb0289834e76a23839719ab0e89fe29b591d3f diff --git a/http_recv.c b/http_recv.c index da28faaa..c418af12 100644 --- a/http_recv.c +++ b/http_recv.c @@ -21,11 +21,14 @@ #include "para.h" #include "http.h" +#include "list.h" +#include "sched.h" #include "recv.h" #include "http_recv.cmdline.h" #include "error.h" #include "net.h" #include "string.h" +#include "fd.h" /** the output buffer size of the http receiver */ #define BUFSIZE (32 * 1024) @@ -83,67 +86,73 @@ static void http_shutdown(void) static char *make_request_msg(void) { char *ret, *hn = para_hostname(); - ret = make_message("%s\nHost: %s\nUser-Agent: para_recv/%s\n\n\n", - HTTP_GET_MSG, hn, VERSION); + ret = make_message("%s1.0\nHost: %s\nUser-Agent: para_recv/%s\n\n\n", + HTTP_GET_MSG, hn, PACKAGE_VERSION); free(hn); return ret; } -static int http_pre_select(struct receiver_node *rn, fd_set *rfds, fd_set *wfds, - __unused struct timeval *timeout) +static void http_recv_pre_select(struct sched *s, struct task *t) { + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; + t->ret = 1; if (phd->status == HTTP_CONNECTED) - FD_SET(phd->fd, wfds); + para_fd_set(phd->fd, &s->wfds, &s->max_fileno); else - FD_SET(phd->fd, rfds); - return phd->fd; + para_fd_set(phd->fd, &s->rfds, &s->max_fileno); } -static int http_post_select(struct receiver_node *rn, int select_ret, - fd_set *rfds, fd_set *wfds) + +static void http_recv_post_select(struct sched *s, struct task *t) { - int ret; + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; - if (!select_ret) /* we're not interested in timeouts */ - return 1; + t->ret = -E_HTTP_RECV_EOF; + if (rn->output_eof && *rn->output_eof) + goto out; + t->ret = 1; + if (!s->select_ret) + goto out; if (phd->status == HTTP_CONNECTED) { char *rq; - if (!FD_ISSET(phd->fd, wfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->wfds)) + goto out; rq = make_request_msg(); - PARA_NOTICE_LOG("%s", "sending http request\n"); - ret = send_va_buffer(phd->fd, "%s", rq); + PARA_INFO_LOG("%s", "sending http request\n"); + t->ret = send_va_buffer(phd->fd, "%s", rq); free(rq); - if (ret < 0) - return E_SEND_HTTP_REQUEST; - phd->status = HTTP_SENT_GET_REQUEST; - return 1; + if (t->ret > 0) + phd->status = HTTP_SENT_GET_REQUEST; + goto out; } - if (!FD_ISSET(phd->fd, rfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->rfds)) + goto out; if (phd->status == HTTP_SENT_GET_REQUEST) { - ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); - if (ret < 0) - return -E_MISSING_OK; - PARA_NOTICE_LOG("%s", "received ok msg, streaming\n"); + t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); + if (t->ret < 0) + goto out; + PARA_INFO_LOG("%s", "received ok msg, streaming\n"); + t->ret = 1; phd->status = HTTP_STREAMING; - return 1; + goto out; } - /* already streaming */ - if (rn->loaded >= BUFSIZE) { - PARA_ERROR_LOG("%s", "buffer overrun\n"); - return -E_OVERRUN; + t->ret = -E_OVERRUN; + if (rn->loaded >= BUFSIZE) + goto out; + t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, + BUFSIZE - rn->loaded); + if (t->ret <= 0) { + if (!t->ret) + t->ret = -E_HTTP_RECV_EOF; + goto out; } - ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded); - if (ret <= 0) { - PARA_NOTICE_LOG("recv returned %d/%zd\n", ret, BUFSIZE - rn->loaded); - return ret < 0? -E_HTTP_RECV_BUF : 0; - } - rn->loaded += ret; - return 1; + rn->loaded += t->ret; +out: + if (t->ret < 0) + rn->eof = 1; } static void http_recv_close(struct receiver_node *rn) @@ -175,21 +184,25 @@ static int http_recv_open(struct receiver_node *rn) rn->buf = para_calloc(BUFSIZE); rn->private_data = para_calloc(sizeof(struct private_http_recv_data)); phd = rn->private_data; - ret = -E_HOST_INFO; - if (!(he = get_host_info(conf->host_arg))) + ret = get_host_info(conf->host_arg, &he); + if (!ret < 0) goto err_out; /* get new socket */ - ret = -E_SOCKET; - if ((phd->fd = get_socket()) < 0) + ret = get_socket(); + if (ret < 0) goto err_out; + phd->fd = ret; /* init their_addr */ init_sockaddr(&their_addr, conf->port_arg, he); /* connect */ PARA_NOTICE_LOG("connecting to %s:%d\n", conf->host_arg, conf->port_arg); ret = para_connect(phd->fd, &their_addr); - if (ret < 0) + if (ret < 0) { + close(phd->fd); goto err_out; + } + mark_fd_nonblock(phd->fd); phd->status = HTTP_CONNECTED; return 1; err_out: @@ -209,8 +222,8 @@ void http_recv_init(struct receiver *r) { r->open = http_recv_open; r->close = http_recv_close; - r->pre_select = http_pre_select; - r->post_select = http_post_select; + r->pre_select = http_recv_pre_select; + r->post_select = http_recv_post_select; r->shutdown = http_shutdown; r->parse_config = http_recv_parse_config; }