X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=http_recv.c;h=b566acf3c0612a48d5796ca01dae5359bb93386e;hp=5f1d779991d60dfceefd446c54c0ef700e6ca523;hb=e89405390dc82917fcde6c00393bd2dfd82ddbfa;hpb=12b81c91b25b9d0d4bce43c4f87220c3c7f991da diff --git a/http_recv.c b/http_recv.c index 5f1d7799..b566acf3 100644 --- a/http_recv.c +++ b/http_recv.c @@ -21,11 +21,14 @@ #include "para.h" #include "http.h" +#include "list.h" +#include "sched.h" #include "recv.h" #include "http_recv.cmdline.h" #include "error.h" #include "net.h" #include "string.h" +#include "fd.h" /** the output buffer size of the http receiver */ #define BUFSIZE (32 * 1024) @@ -89,61 +92,62 @@ static char *make_request_msg(void) return ret; } -static int http_pre_select(struct receiver_node *rn, fd_set *rfds, fd_set *wfds, - __a_unused struct timeval *timeout) +static void http_recv_pre_select(struct sched *s, struct task *t) { + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; + t->ret = 1; if (phd->status == HTTP_CONNECTED) - FD_SET(phd->fd, wfds); + para_fd_set(phd->fd, &s->wfds, &s->max_fileno); else - FD_SET(phd->fd, rfds); - return phd->fd; + para_fd_set(phd->fd, &s->rfds, &s->max_fileno); } -static int http_post_select(struct receiver_node *rn, int select_ret, - fd_set *rfds, fd_set *wfds) + +static void http_recv_post_select(struct sched *s, struct task *t) { - int ret; + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; - if (!select_ret) /* we're not interested in timeouts */ - return 1; + t->ret = 1; + if (!s->select_ret) /* we're not interested in timeouts */ + return; if (phd->status == HTTP_CONNECTED) { char *rq; - if (!FD_ISSET(phd->fd, wfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->wfds)) + return; /* nothing to do */ rq = make_request_msg(); PARA_NOTICE_LOG("%s", "sending http request\n"); - ret = send_va_buffer(phd->fd, "%s", rq); + t->ret = send_va_buffer(phd->fd, "%s", rq); free(rq); - if (ret < 0) - return E_SEND_HTTP_REQUEST; - phd->status = HTTP_SENT_GET_REQUEST; - return 1; + if (t->ret > 0) + phd->status = HTTP_SENT_GET_REQUEST; + return; } - if (!FD_ISSET(phd->fd, rfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->rfds)) + return; /* nothing to do */ if (phd->status == HTTP_SENT_GET_REQUEST) { - ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); - if (ret < 0) - return -E_MISSING_OK; + t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); + if (t->ret < 0) + return; PARA_NOTICE_LOG("%s", "received ok msg, streaming\n"); phd->status = HTTP_STREAMING; - return 1; + return; } + t->ret = -E_OVERRUN; /* already streaming */ - if (rn->loaded >= BUFSIZE) { - PARA_ERROR_LOG("%s", "buffer overrun\n"); - return -E_OVERRUN; + if (rn->loaded >= BUFSIZE) + return; + t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, + BUFSIZE - rn->loaded); + if (t->ret <= 0) { + rn->eof = 1; + if (!t->ret) + t->ret = -E_HTTP_RECV_EOF; + return; } - ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded); - if (ret <= 0) { - PARA_NOTICE_LOG("recv returned %d/%zd\n", ret, BUFSIZE - rn->loaded); - return ret < 0? -E_HTTP_RECV_BUF : 0; - } - rn->loaded += ret; - return 1; + rn->loaded += t->ret; } static void http_recv_close(struct receiver_node *rn) @@ -175,6 +179,7 @@ static int http_recv_open(struct receiver_node *rn) rn->buf = para_calloc(BUFSIZE); rn->private_data = para_calloc(sizeof(struct private_http_recv_data)); phd = rn->private_data; + PARA_NOTICE_LOG("phd = %p, rn = %p\n", phd, rn); ret = -E_HOST_INFO; if (!(he = get_host_info(conf->host_arg))) goto err_out; @@ -209,8 +214,8 @@ void http_recv_init(struct receiver *r) { r->open = http_recv_open; r->close = http_recv_close; - r->pre_select = http_pre_select; - r->post_select = http_post_select; + r->pre_select = http_recv_pre_select; + r->post_select = http_recv_post_select; r->shutdown = http_shutdown; r->parse_config = http_recv_parse_config; }