summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
ff63e4b)
Although fecdec, the filter receiving the output of the udp receiver,
does not strictly benefit from this change, using buffer pools in
receivers has the advantage that we don't need to guess the input
buffer size or call realloc() after the receive to shrink the buffer.
struct private_udp_recv_data {
/** The socket file descriptor. */
int fd;
struct private_udp_recv_data {
/** The socket file descriptor. */
int fd;
};
static void udp_recv_pre_select(struct sched *s, struct task *t)
};
static void udp_recv_pre_select(struct sched *s, struct task *t)
-#define UDP_RECV_READ_BUF_SIZE 1500
-
static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t)
{
struct receiver_node *rn = container_of(t, struct receiver_node, task);
static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t)
{
struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct btr_node *btrn = rn->btrn;
int ret;
char *buf = NULL;
struct btr_node *btrn = rn->btrn;
int ret;
char *buf = NULL;
+ size_t packet_size, bufsize;
t->error = 0;
ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
t->error = 0;
ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
return;
if (!FD_ISSET(purd->fd, &s->rfds))
return;
return;
if (!FD_ISSET(purd->fd, &s->rfds))
return;
- buf = para_malloc(UDP_RECV_READ_BUF_SIZE);
- ret = recv_bin_buffer(purd->fd, buf, UDP_RECV_READ_BUF_SIZE);
+ bufsize = btr_pool_get_buffer(purd->btrp, &buf);
+ ret = -E_UDP_OVERRUN;
+ if (bufsize == 0)
+ goto err;
+ ret = recv_bin_buffer(purd->fd, buf, bufsize);
if (ret == 0)
ret = -E_RECV_EOF;
if (ret < 0)
if (ret == 0)
ret = -E_RECV_EOF;
if (ret < 0)
packet_size = ret;
if (packet_size >= FEC_EOF_PACKET_LEN) {
if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) {
packet_size = ret;
if (packet_size >= FEC_EOF_PACKET_LEN) {
if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) {
+ PARA_CRIT_LOG("%p: eof\n", rn);
ret = -E_RECV_EOF;
goto err;
}
}
ret = -E_RECV_EOF;
goto err;
}
}
- btr_add_output(buf, ret, btrn);
+ btr_pool_allocate(purd->btrp, packet_size);
+ btr_add_output_pool(purd->btrp, buf, packet_size, btrn);
btr_remove_node(btrn);
t->error = ret;
btr_remove_node(btrn);
t->error = ret;
+ close(purd->fd);
+ purd->fd = -1;
}
static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
}
static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
if (purd->fd >= 0)
close(purd->fd);
if (purd->fd >= 0)
close(purd->fd);
+ PARA_CRIT_LOG("%p: close\n", rn);
+ btr_pool_free(purd->btrp);
free(rn->private_data);
free(rn->buf);
free(rn->private_data);
free(rn->buf);
- udp_recv_cmdline_parser_free(rn->conf);
}
static void *udp_recv_parse_config(int argc, char **argv)
}
static void *udp_recv_parse_config(int argc, char **argv)
}
ret = mark_fd_nonblocking(purd->fd);
}
ret = mark_fd_nonblocking(purd->fd);
+ if (ret < 0) {
+ close(purd->fd);
- PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
+ }
+ PARA_CRIT_LOG("rn %p: receiving from %s:%d, fd=%d\n", rn, c->host_arg,
+ purd->btrp = btr_pool_new(320 * 1024);
return purd->fd;
err:
free(rn->private_data);
return purd->fd;
err:
free(rn->private_data);