X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=udp_recv.c;h=eb641d837825b7e18a100717c919a340eaac41a0;hp=bb166c5147ef18603fc87e9f19b0b112084cea07;hb=5c0c60e0efe860962c2d9132f4aef3d9e43b25bc;hpb=5496ca9d07ede9aaa3afd86b60cdf16ed8ccca2b diff --git a/udp_recv.c b/udp_recv.c index bb166c51..eb641d83 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -18,7 +18,6 @@ #include "ggo.h" #include "recv.h" #include "udp_recv.cmdline.h" -#include "audiod.h" #include "string.h" #include "net.h" #include "fd.h" @@ -34,6 +33,7 @@ struct private_udp_recv_data { /** The socket file descriptor. */ int fd; + struct btr_pool *btrp; }; static void udp_recv_pre_select(struct sched *s, struct task *t) @@ -101,8 +101,6 @@ success: t->error = 1; } -#define UDP_RECV_READ_BUF_SIZE 1500 - static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); @@ -110,7 +108,7 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) struct btr_node *btrn = rn->btrn; int ret; char *buf = NULL; - size_t packet_size; + size_t packet_size, bufsize; t->error = 0; ret = btr_node_status(btrn, 0, BTR_NT_ROOT); @@ -120,8 +118,11 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) return; if (!FD_ISSET(purd->fd, &s->rfds)) return; - buf = para_malloc(UDP_RECV_READ_BUF_SIZE); - ret = recv_bin_buffer(purd->fd, buf, UDP_RECV_READ_BUF_SIZE); + bufsize = btr_pool_get_buffer(purd->btrp, &buf); + ret = -E_UDP_OVERRUN; + if (bufsize == 0) + goto err; + ret = recv_bin_buffer(purd->fd, buf, bufsize); if (ret == 0) ret = -E_RECV_EOF; if (ret < 0) @@ -129,16 +130,18 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) packet_size = ret; if (packet_size >= FEC_EOF_PACKET_LEN) { if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) { + PARA_INFO_LOG("received eof packet\n"); ret = -E_RECV_EOF; goto err; } } - btr_add_output(buf, ret, btrn); + btr_add_output_pool(purd->btrp, packet_size, btrn); return; err: - free(buf); btr_remove_node(btrn); t->error = ret; + close(purd->fd); + purd->fd = -1; } static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) @@ -160,9 +163,9 @@ static void udp_recv_close(struct receiver_node *rn) if (purd->fd >= 0) close(purd->fd); + btr_pool_free(purd->btrp); free(rn->private_data); free(rn->buf); - udp_recv_cmdline_parser_free(rn->conf); } static void *udp_recv_parse_config(int argc, char **argv) @@ -262,10 +265,13 @@ static int udp_recv_open(struct receiver_node *rn) } ret = mark_fd_nonblocking(purd->fd); - if (ret < 0) + if (ret < 0) { + close(purd->fd); goto err; - PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, + } + PARA_INFO_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, c->port_arg, purd->fd); + purd->btrp = btr_pool_new("udp_recv", 320 * 1024); return purd->fd; err: free(rn->private_data); @@ -273,6 +279,11 @@ err: return ret; } +static void udp_recv_free_config(void *conf) +{ + udp_recv_cmdline_parser_free(conf); +} + /** * The init function of the udp receiver. * @@ -291,6 +302,7 @@ void udp_recv_init(struct receiver *r) r->pre_select = udp_recv_pre_select; r->post_select = udp_recv_post_select; r->parse_config = udp_recv_parse_config; + r->free_config = udp_recv_free_config; r->help = (struct ggo_help) { .short_help = udp_recv_args_info_help, .detailed_help = udp_recv_args_info_detailed_help