From: Andre Noll Date: Tue, 20 Sep 2011 22:36:08 +0000 (+0200) Subject: recv: Make ->btrp and ->fd generic. X-Git-Tag: v0.4.9~4^2~1 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=a19ff95e6ec4b76c13446e0687fcf2ae537529ab;hp=fddf60223e26c1dc1e02b432652c4ca6a237c278;ds=sidebyside recv: Make ->btrp and ->fd generic. All three receivers maintain a file descriptor and a buffer tree pool per connection. Currently these are part of the receiver node's private_data structure, which is a needless code duplication. This patch moves both fields to the generic struct receiver_node. Since the private_data structure of the udp and dccp receivers contained no other fields, this allows to get rid of it completely for these two receivers. --- diff --git a/dccp_recv.c b/dccp_recv.c index a9eab006..41518539 100644 --- a/dccp_recv.c +++ b/dccp_recv.c @@ -27,33 +27,15 @@ #include "dccp_recv.cmdline.h" -/** - * data specific to the dccp receiver - * - * \sa receiver receiver_node - */ -struct private_dccp_recv_data { - /** the file descriptor for the dccp socket */ - int fd; - struct btr_pool *btrp; -}; - static void dccp_recv_close(struct receiver_node *rn) { - struct private_dccp_recv_data *pdd = rn->private_data; - - if (!pdd) - return; - if (pdd->fd > 0) - close(pdd->fd); - btr_pool_free(pdd->btrp); - free(pdd); - rn->private_data = NULL; + if (rn->fd > 0) + close(rn->fd); + btr_pool_free(rn->btrp); } static int dccp_recv_open(struct receiver_node *rn) { - struct private_dccp_recv_data *pdd; struct dccp_recv_args_info *conf = rn->conf; struct flowopts *fo = NULL; uint8_t *ccids = NULL; @@ -87,9 +69,8 @@ static int dccp_recv_open(struct receiver_node *rn) ret = mark_fd_nonblocking(fd); if (ret < 0) goto err; - rn->private_data = pdd = para_calloc(sizeof(struct private_dccp_recv_data)); - pdd->btrp = btr_pool_new("dccp_recv", 320 * 1024); - pdd->fd = fd; + rn->btrp = btr_pool_new("dccp_recv", 320 * 1024); + rn->fd = fd; return 1; err: close(fd); @@ -136,18 +117,16 @@ static void *dccp_recv_parse_config(int argc, char **argv) static void dccp_recv_pre_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_dccp_recv_data *pdd = rn->private_data; t->error = 0; if (generic_recv_pre_select(s, t) <= 0) return; - para_fd_set(pdd->fd, &s->rfds, &s->max_fileno); + para_fd_set(rn->fd, &s->rfds, &s->max_fileno); } static void dccp_recv_post_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_dccp_recv_data *pdd = rn->private_data; struct btr_node *btrn = rn->btrn; struct iovec iov[2]; int ret, iovcnt; @@ -156,18 +135,18 @@ static void dccp_recv_post_select(struct sched *s, struct task *t) ret = btr_node_status(btrn, 0, BTR_NT_ROOT); if (ret <= 0) goto out; - iovcnt = btr_pool_get_buffers(pdd->btrp, iov); + iovcnt = btr_pool_get_buffers(rn->btrp, iov); ret = -E_DCCP_OVERRUN; if (iovcnt == 0) goto out; - ret = readv_nonblock(pdd->fd, iov, iovcnt, &s->rfds, &num_bytes); + ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes); if (num_bytes == 0) goto out; if (num_bytes <= iov[0].iov_len) /* only the first buffer was filled */ - btr_add_output_pool(pdd->btrp, num_bytes, btrn); + btr_add_output_pool(rn->btrp, num_bytes, btrn); else { /* both buffers contain data */ - btr_add_output_pool(pdd->btrp, iov[0].iov_len, btrn); - btr_add_output_pool(pdd->btrp, num_bytes - iov[0].iov_len, btrn); + btr_add_output_pool(rn->btrp, iov[0].iov_len, btrn); + btr_add_output_pool(rn->btrp, num_bytes - iov[0].iov_len, btrn); } out: if (ret >= 0) diff --git a/http_recv.c b/http_recv.c index 5717692c..d4fe29ad 100644 --- a/http_recv.c +++ b/http_recv.c @@ -44,24 +44,6 @@ struct private_http_recv_data { * \sa receiver::open, receiver_node. */ enum http_recv_status status; - /** - * The file descriptor used for receiving the http stream. - * - * The pre_select function of the http receiver adds this file descriptor to - * the set of file descriptors which are checked for reading/writing (depending - * on the current status) by the select loop of the application (para_audiod or - * para_recv). - * - * The post_select function of the http receiver uses \a fd, if ready, to - * establish the http connection, and updates \a status according to the new - * state of the connection. As soon as \a status is \p HTTP_STREAMING, \a fd is - * going to be only checked for reading. If data is available, it is read into - * the output buffer of the receiver node by post_select. - * - * \sa receiver::pre_select receiver::post_select receiver_node, http_recv_status - */ - int fd; - struct btr_pool *btrp; }; static char *make_request_msg(void) @@ -82,11 +64,16 @@ static void http_recv_pre_select(struct sched *s, struct task *t) if (generic_recv_pre_select(s, t) <= 0) return; if (phd->status == HTTP_CONNECTED) - para_fd_set(phd->fd, &s->wfds, &s->max_fileno); + para_fd_set(rn->fd, &s->wfds, &s->max_fileno); else - para_fd_set(phd->fd, &s->rfds, &s->max_fileno); + para_fd_set(rn->fd, &s->rfds, &s->max_fileno); } +/* + * Establish the http connection. If already established, fill the buffer pool + * area with data read from the socket. In any case, update the state of the + * connection if necessary. + */ static void http_recv_post_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); @@ -104,11 +91,11 @@ static void http_recv_post_select(struct sched *s, struct task *t) return; if (phd->status == HTTP_CONNECTED) { char *rq; - if (!FD_ISSET(phd->fd, &s->wfds)) + if (!FD_ISSET(rn->fd, &s->wfds)) return; rq = make_request_msg(); PARA_INFO_LOG("sending http request\n"); - ret = send_va_buffer(phd->fd, "%s", rq); + ret = send_va_buffer(rn->fd, "%s", rq); free(rq); if (ret < 0) goto err; @@ -116,7 +103,7 @@ static void http_recv_post_select(struct sched *s, struct task *t) return; } if (phd->status == HTTP_SENT_GET_REQUEST) { - ret = read_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds); + ret = read_pattern(rn->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds); if (ret < 0) goto err; if (ret == 0) @@ -126,12 +113,12 @@ static void http_recv_post_select(struct sched *s, struct task *t) return; } ret = -E_HTTP_RECV_OVERRUN; - sz = btr_pool_get_buffer(phd->btrp, &buf); + sz = btr_pool_get_buffer(rn->btrp, &buf); if (sz == 0) goto err; - ret = read_nonblock(phd->fd, buf, sz, &s->rfds, &n); + ret = read_nonblock(rn->fd, buf, sz, &s->rfds, &n); if (n > 0) - btr_add_output_pool(phd->btrp, n, btrn); + btr_add_output_pool(rn->btrp, n, btrn); if (ret >= 0) return; err: @@ -141,10 +128,8 @@ err: static void http_recv_close(struct receiver_node *rn) { - struct private_http_recv_data *phd = rn->private_data; - - close(phd->fd); - btr_pool_free(phd->btrp); + close(rn->fd); + btr_pool_free(rn->btrp); free(rn->private_data); } @@ -174,9 +159,9 @@ static int http_recv_open(struct receiver_node *rn) return ret; } rn->private_data = phd = para_calloc(sizeof(struct private_http_recv_data)); - phd->fd = fd; + rn->fd = fd; phd->status = HTTP_CONNECTED; - phd->btrp = btr_pool_new("http_recv", 320 * 1024); + rn->btrp = btr_pool_new("http_recv", 320 * 1024); return 1; } diff --git a/recv.h b/recv.h index 7555dfb4..92b63d27 100644 --- a/recv.h +++ b/recv.h @@ -20,6 +20,21 @@ struct receiver_node { struct task task; /** The receiver node is always the root of the buffer tree. */ struct btr_node *btrn; + /** Each receiver node maintains a buffer pool for the received data. */ + struct btr_pool *btrp; + /** + * The file descriptor to receive the stream. + * + * The pre_select function of the receiver adds this file descriptor to + * the set of file descriptors which are watched for readability or + * writability, depending on the state of the connection (if any). + * + * If \a fd is readable, the post_select function of the receiver reads + * data from this fd into the buffer pool area of \a btrp. + * + * \sa \ref receiver. + */ + int fd; }; /** diff --git a/udp_recv.c b/udp_recv.c index 45d24eae..964b60d4 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -22,25 +22,13 @@ #include "fd.h" #include "buffer_tree.h" -/** - * Data specific to the udp receiver. - * - * \sa \ref receiver, \ref receiver_node. - */ -struct private_udp_recv_data { - /** The socket file descriptor. */ - int fd; - struct btr_pool *btrp; -}; - static void udp_recv_pre_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_udp_recv_data *purd = rn->private_data; if (generic_recv_pre_select(s, t) <= 0) return; - para_fd_set(purd->fd, &s->rfds, &s->max_fileno); + para_fd_set(rn->fd, &s->rfds, &s->max_fileno); } static int udp_check_eof(size_t sz, struct iovec iov[2]) @@ -64,7 +52,6 @@ static int udp_check_eof(size_t sz, struct iovec iov[2]) static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_udp_recv_data *purd = rn->private_data; struct btr_node *btrn = rn->btrn; size_t num_bytes; struct iovec iov[2]; @@ -74,11 +61,11 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) ret = btr_node_status(btrn, 0, BTR_NT_ROOT); if (ret <= 0) goto out; - iovcnt = btr_pool_get_buffers(purd->btrp, iov); + iovcnt = btr_pool_get_buffers(rn->btrp, iov); ret = -E_UDP_OVERRUN; if (iovcnt == 0) goto out; - ret = readv_nonblock(purd->fd, iov, iovcnt, &s->rfds, &num_bytes); + ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes); if (num_bytes == 0) goto out; readv_ret = ret; @@ -86,10 +73,10 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) if (ret < 0) goto out; if (iov[0].iov_len >= num_bytes) - btr_add_output_pool(purd->btrp, num_bytes, btrn); + btr_add_output_pool(rn->btrp, num_bytes, btrn); else { /* both buffers contain data */ - btr_add_output_pool(purd->btrp, iov[0].iov_len, btrn); - btr_add_output_pool(purd->btrp, num_bytes - iov[0].iov_len, + btr_add_output_pool(rn->btrp, iov[0].iov_len, btrn); + btr_add_output_pool(rn->btrp, num_bytes - iov[0].iov_len, btrn); } ret = readv_ret; @@ -98,18 +85,15 @@ out: return; btr_remove_node(btrn); t->error = ret; - close(purd->fd); - purd->fd = -1; + close(rn->fd); + rn->fd = -1; } static void udp_recv_close(struct receiver_node *rn) { - struct private_udp_recv_data *purd = rn->private_data; - - if (purd->fd >= 0) - close(purd->fd); - btr_pool_free(purd->btrp); - free(rn->private_data); + if (rn->fd >= 0) + close(rn->fd); + btr_pool_free(rn->btrp); } static void *udp_recv_parse_config(int argc, char **argv) @@ -188,36 +172,31 @@ err: static int udp_recv_open(struct receiver_node *rn) { - struct private_udp_recv_data *purd; struct udp_recv_args_info *c = rn->conf; char *iface = c->iface_given ? c->iface_arg : NULL; int ret; - rn->private_data = para_calloc(sizeof(struct private_udp_recv_data)); - purd = rn->private_data; - ret = makesock(IPPROTO_UDP, 1, c->host_arg, c->port_arg, NULL); if (ret < 0) goto err; - purd->fd = ret; + rn->fd = ret; - ret = mcast_receiver_setup(purd->fd, iface); + ret = mcast_receiver_setup(rn->fd, iface); if (ret < 0) { - close(purd->fd); + close(rn->fd); goto err; } - ret = mark_fd_nonblocking(purd->fd); + ret = mark_fd_nonblocking(rn->fd); if (ret < 0) { - close(purd->fd); + close(rn->fd); goto err; } PARA_INFO_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, - c->port_arg, purd->fd); - purd->btrp = btr_pool_new("udp_recv", 320 * 1024); - return purd->fd; + c->port_arg, rn->fd); + rn->btrp = btr_pool_new("udp_recv", 320 * 1024); + return rn->fd; err: - free(rn->private_data); return ret; }