X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=http_recv.c;h=096541679ea5ae30116f478436c2303c1aabf567;hp=49c10f9550fb529a8def9ff3b03c74b3e772e4bb;hb=27638103f249ffbe9768603b9baff199950fd9f6;hpb=e0e5a7c1a04c6a2ee4a475e823657e06e6df2f99 diff --git a/http_recv.c b/http_recv.c index 49c10f95..09654167 100644 --- a/http_recv.c +++ b/http_recv.c @@ -21,11 +21,14 @@ #include "para.h" #include "http.h" +#include "list.h" +#include "sched.h" #include "recv.h" #include "http_recv.cmdline.h" #include "error.h" #include "net.h" #include "string.h" +#include "fd.h" /** the output buffer size of the http receiver */ #define BUFSIZE (32 * 1024) @@ -83,67 +86,73 @@ static void http_shutdown(void) static char *make_request_msg(void) { char *ret, *hn = para_hostname(); - ret = make_message("%s\nHost: %s\nUser-Agent: para_recv/%s\n\n\n", + ret = make_message("%s1.0\nHost: %s\nUser-Agent: para_recv/%s\n\n\n", HTTP_GET_MSG, hn, VERSION); free(hn); return ret; } -static int http_pre_select(struct receiver_node *rn, fd_set *rfds, fd_set *wfds, - __a_unused struct timeval *timeout) +static void http_recv_pre_select(struct sched *s, struct task *t) { + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; + t->ret = 1; if (phd->status == HTTP_CONNECTED) - FD_SET(phd->fd, wfds); + para_fd_set(phd->fd, &s->wfds, &s->max_fileno); else - FD_SET(phd->fd, rfds); - return phd->fd; + para_fd_set(phd->fd, &s->rfds, &s->max_fileno); } -static int http_post_select(struct receiver_node *rn, int select_ret, - fd_set *rfds, fd_set *wfds) + +static void http_recv_post_select(struct sched *s, struct task *t) { - int ret; + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; - if (!select_ret) /* we're not interested in timeouts */ - return 1; + t->ret = -E_HTTP_RECV_EOF; + if (rn->output_eof && *rn->output_eof) + goto out; + t->ret = 1; + if (!s->select_ret) + goto out; if (phd->status == HTTP_CONNECTED) { char *rq; - if (!FD_ISSET(phd->fd, wfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->wfds)) + goto out; rq = make_request_msg(); PARA_NOTICE_LOG("%s", "sending http request\n"); - ret = send_va_buffer(phd->fd, "%s", rq); + t->ret = send_va_buffer(phd->fd, "%s", rq); free(rq); - if (ret < 0) - return E_SEND_HTTP_REQUEST; - phd->status = HTTP_SENT_GET_REQUEST; - return 1; + if (t->ret > 0) + phd->status = HTTP_SENT_GET_REQUEST; + goto out; } - if (!FD_ISSET(phd->fd, rfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->rfds)) + goto out; if (phd->status == HTTP_SENT_GET_REQUEST) { - ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); - if (ret < 0) - return -E_MISSING_OK; + t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); + if (t->ret < 0) + goto out; PARA_NOTICE_LOG("%s", "received ok msg, streaming\n"); + t->ret = 1; phd->status = HTTP_STREAMING; - return 1; - } - /* already streaming */ - if (rn->loaded >= BUFSIZE) { - PARA_ERROR_LOG("%s", "buffer overrun\n"); - return -E_OVERRUN; + goto out; } - ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded); - if (ret <= 0) { - PARA_NOTICE_LOG("recv returned %d/%zd\n", ret, BUFSIZE - rn->loaded); - return ret < 0? -E_HTTP_RECV_BUF : 0; + t->ret = -E_OVERRUN; + if (rn->loaded >= BUFSIZE) + goto out; + t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, + BUFSIZE - rn->loaded); + if (t->ret <= 0) { + if (!t->ret) + t->ret = -E_HTTP_RECV_EOF; + goto out; } - rn->loaded += ret; - return 1; + rn->loaded += t->ret; +out: + if (t->ret < 0) + rn->eof = 1; } static void http_recv_close(struct receiver_node *rn) @@ -175,13 +184,15 @@ static int http_recv_open(struct receiver_node *rn) rn->buf = para_calloc(BUFSIZE); rn->private_data = para_calloc(sizeof(struct private_http_recv_data)); phd = rn->private_data; - ret = -E_HOST_INFO; - if (!(he = get_host_info(conf->host_arg))) + PARA_NOTICE_LOG("phd = %p, rn = %p\n", phd, rn); + ret = get_host_info(conf->host_arg, &he); + if (!ret < 0) goto err_out; /* get new socket */ - ret = -E_SOCKET; - if ((phd->fd = get_socket()) < 0) + ret = get_socket(); + if (ret < 0) goto err_out; + phd->fd = ret; /* init their_addr */ init_sockaddr(&their_addr, conf->port_arg, he); /* connect */ @@ -190,6 +201,7 @@ static int http_recv_open(struct receiver_node *rn) ret = para_connect(phd->fd, &their_addr); if (ret < 0) goto err_out; + mark_fd_nonblock(phd->fd); phd->status = HTTP_CONNECTED; return 1; err_out: @@ -209,8 +221,8 @@ void http_recv_init(struct receiver *r) { r->open = http_recv_open; r->close = http_recv_close; - r->pre_select = http_pre_select; - r->post_select = http_post_select; + r->pre_select = http_recv_pre_select; + r->post_select = http_recv_post_select; r->shutdown = http_shutdown; r->parse_config = http_recv_parse_config; }