X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=http_recv.c;h=3d72c7c7396ca3b8f96f63653aec9c021cb90846;hp=5f1d779991d60dfceefd446c54c0ef700e6ca523;hb=8187d1acf18bfcf63db7b3a35ecb2ea3358fa2ad;hpb=12b81c91b25b9d0d4bce43c4f87220c3c7f991da diff --git a/http_recv.c b/http_recv.c index 5f1d7799..3d72c7c7 100644 --- a/http_recv.c +++ b/http_recv.c @@ -21,11 +21,14 @@ #include "para.h" #include "http.h" +#include "list.h" +#include "sched.h" #include "recv.h" #include "http_recv.cmdline.h" #include "error.h" #include "net.h" #include "string.h" +#include "fd.h" /** the output buffer size of the http receiver */ #define BUFSIZE (32 * 1024) @@ -89,61 +92,67 @@ static char *make_request_msg(void) return ret; } -static int http_pre_select(struct receiver_node *rn, fd_set *rfds, fd_set *wfds, - __a_unused struct timeval *timeout) +static void http_recv_pre_select(struct sched *s, struct task *t) { + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; + t->ret = 1; if (phd->status == HTTP_CONNECTED) - FD_SET(phd->fd, wfds); + para_fd_set(phd->fd, &s->wfds, &s->max_fileno); else - FD_SET(phd->fd, rfds); - return phd->fd; + para_fd_set(phd->fd, &s->rfds, &s->max_fileno); } -static int http_post_select(struct receiver_node *rn, int select_ret, - fd_set *rfds, fd_set *wfds) + +static void http_recv_post_select(struct sched *s, struct task *t) { - int ret; + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; - if (!select_ret) /* we're not interested in timeouts */ - return 1; + t->ret = -E_HTTP_RECV_EOF; + if (rn->output_eof && *rn->output_eof) + goto out; + t->ret = 1; + if (!s->select_ret) + goto out; if (phd->status == HTTP_CONNECTED) { char *rq; - if (!FD_ISSET(phd->fd, wfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->wfds)) + goto out; rq = make_request_msg(); PARA_NOTICE_LOG("%s", "sending http request\n"); - ret = send_va_buffer(phd->fd, "%s", rq); + t->ret = send_va_buffer(phd->fd, "%s", rq); free(rq); - if (ret < 0) - return E_SEND_HTTP_REQUEST; - phd->status = HTTP_SENT_GET_REQUEST; - return 1; + if (t->ret > 0) + phd->status = HTTP_SENT_GET_REQUEST; + goto out; } - if (!FD_ISSET(phd->fd, rfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->rfds)) + goto out; if (phd->status == HTTP_SENT_GET_REQUEST) { - ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); - if (ret < 0) - return -E_MISSING_OK; + t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); + if (t->ret < 0) + goto out; PARA_NOTICE_LOG("%s", "received ok msg, streaming\n"); + t->ret = 1; phd->status = HTTP_STREAMING; - return 1; + goto out; } - /* already streaming */ - if (rn->loaded >= BUFSIZE) { - PARA_ERROR_LOG("%s", "buffer overrun\n"); - return -E_OVERRUN; + t->ret = -E_OVERRUN; + if (rn->loaded >= BUFSIZE) + goto out; + t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, + BUFSIZE - rn->loaded); + if (t->ret <= 0) { + if (!t->ret) + t->ret = -E_HTTP_RECV_EOF; + goto out; } - ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded); - if (ret <= 0) { - PARA_NOTICE_LOG("recv returned %d/%zd\n", ret, BUFSIZE - rn->loaded); - return ret < 0? -E_HTTP_RECV_BUF : 0; - } - rn->loaded += ret; - return 1; + rn->loaded += t->ret; +out: + if (t->ret < 0) + rn->eof = 1; } static void http_recv_close(struct receiver_node *rn) @@ -175,8 +184,9 @@ static int http_recv_open(struct receiver_node *rn) rn->buf = para_calloc(BUFSIZE); rn->private_data = para_calloc(sizeof(struct private_http_recv_data)); phd = rn->private_data; - ret = -E_HOST_INFO; - if (!(he = get_host_info(conf->host_arg))) + PARA_NOTICE_LOG("phd = %p, rn = %p\n", phd, rn); + ret = get_host_info(conf->host_arg, &he); + if (!ret < 0) goto err_out; /* get new socket */ ret = -E_SOCKET; @@ -209,8 +219,8 @@ void http_recv_init(struct receiver *r) { r->open = http_recv_open; r->close = http_recv_close; - r->pre_select = http_pre_select; - r->post_select = http_post_select; + r->pre_select = http_recv_pre_select; + r->post_select = http_recv_post_select; r->shutdown = http_shutdown; r->parse_config = http_recv_parse_config; }