udp_recv: Use buffer pool API.
authorAndre Noll <maan@systemlinux.org>
Sat, 9 Jan 2010 17:41:07 +0000 (18:41 +0100)
committerAndre Noll <maan@systemlinux.org>
Sat, 9 Jan 2010 17:41:07 +0000 (18:41 +0100)
Although fecdec, the filter receiving the output of the udp receiver,
does not strictly benefit from this change, using buffer pools in
receivers has the advantage that we don't need to guess the input
buffer size or call realloc() after the receive to shrink the buffer.

udp_recv.c

index f68a7100b99128a29ed18cd3e5fe04b2b4263327..0c5a3abf3fd58546ea187fee33516bd761973731 100644 (file)
@@ -33,6 +33,7 @@
 struct private_udp_recv_data {
        /** The socket file descriptor. */
        int fd;
 struct private_udp_recv_data {
        /** The socket file descriptor. */
        int fd;
+       struct btr_pool *btrp;
 };
 
 static void udp_recv_pre_select(struct sched *s, struct task *t)
 };
 
 static void udp_recv_pre_select(struct sched *s, struct task *t)
@@ -100,8 +101,6 @@ success:
        t->error = 1;
 }
 
        t->error = 1;
 }
 
-#define UDP_RECV_READ_BUF_SIZE 1500
-
 static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t)
 {
        struct receiver_node *rn = container_of(t, struct receiver_node, task);
 static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t)
 {
        struct receiver_node *rn = container_of(t, struct receiver_node, task);
@@ -109,7 +108,7 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t)
        struct btr_node *btrn = rn->btrn;
        int ret;
        char *buf = NULL;
        struct btr_node *btrn = rn->btrn;
        int ret;
        char *buf = NULL;
-       size_t packet_size;
+       size_t packet_size, bufsize;
 
        t->error = 0;
        ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
 
        t->error = 0;
        ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
@@ -119,8 +118,11 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t)
                return;
        if (!FD_ISSET(purd->fd, &s->rfds))
                return;
                return;
        if (!FD_ISSET(purd->fd, &s->rfds))
                return;
-       buf = para_malloc(UDP_RECV_READ_BUF_SIZE);
-       ret = recv_bin_buffer(purd->fd, buf, UDP_RECV_READ_BUF_SIZE);
+       bufsize = btr_pool_get_buffer(purd->btrp, &buf);
+       ret = -E_UDP_OVERRUN;
+       if (bufsize == 0)
+               goto err;
+       ret = recv_bin_buffer(purd->fd, buf, bufsize);
        if (ret == 0)
                ret = -E_RECV_EOF;
        if (ret < 0)
        if (ret == 0)
                ret = -E_RECV_EOF;
        if (ret < 0)
@@ -128,16 +130,19 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t)
        packet_size = ret;
        if (packet_size >= FEC_EOF_PACKET_LEN) {
                if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) {
        packet_size = ret;
        if (packet_size >= FEC_EOF_PACKET_LEN) {
                if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) {
+                       PARA_CRIT_LOG("%p: eof\n", rn);
                        ret = -E_RECV_EOF;
                        goto err;
                }
        }
                        ret = -E_RECV_EOF;
                        goto err;
                }
        }
-       btr_add_output(buf, ret, btrn);
+       btr_pool_allocate(purd->btrp, packet_size);
+       btr_add_output_pool(purd->btrp, buf, packet_size, btrn);
        return;
 err:
        return;
 err:
-       free(buf);
        btr_remove_node(btrn);
        t->error = ret;
        btr_remove_node(btrn);
        t->error = ret;
+       close(purd->fd);
+       purd->fd = -1;
 }
 
 static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
 }
 
 static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
@@ -159,9 +164,10 @@ static void udp_recv_close(struct receiver_node *rn)
 
        if (purd->fd >= 0)
                close(purd->fd);
 
        if (purd->fd >= 0)
                close(purd->fd);
+       PARA_CRIT_LOG("%p: close\n", rn);
+       btr_pool_free(purd->btrp);
        free(rn->private_data);
        free(rn->buf);
        free(rn->private_data);
        free(rn->buf);
-       udp_recv_cmdline_parser_free(rn->conf);
 }
 
 static void *udp_recv_parse_config(int argc, char **argv)
 }
 
 static void *udp_recv_parse_config(int argc, char **argv)
@@ -261,10 +267,13 @@ static int udp_recv_open(struct receiver_node *rn)
        }
 
        ret = mark_fd_nonblocking(purd->fd);
        }
 
        ret = mark_fd_nonblocking(purd->fd);
-       if (ret < 0)
+       if (ret < 0) {
+               close(purd->fd);
                goto err;
                goto err;
-       PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
+       }
+       PARA_CRIT_LOG("rn %p: receiving from %s:%d, fd=%d\n", rn, c->host_arg,
                c->port_arg, purd->fd);
                c->port_arg, purd->fd);
+       purd->btrp = btr_pool_new(320 * 1024);
        return purd->fd;
 err:
        free(rn->private_data);
        return purd->fd;
 err:
        free(rn->private_data);