X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=udp_recv.c;h=bb166c5147ef18603fc87e9f19b0b112084cea07;hp=f9782985d113d390c70300d099546066203f188d;hb=b7243e074d583c5977bf89b0a2d8ac4635aebbb6;hpb=2aa98426fca67a8b9c075ca7efea54aef18c0380 diff --git a/udp_recv.c b/udp_recv.c index f9782985..bb166c51 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -22,6 +22,7 @@ #include "string.h" #include "net.h" #include "fd.h" +#include "buffer_tree.h" /** The size of the receiver node buffer. */ #define UDP_RECV_CHUNK_SIZE (128 * 1024) @@ -39,7 +40,13 @@ static void udp_recv_pre_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); struct private_udp_recv_data *purd = rn->private_data; + int ret; + if (rn->btrn) { + ret = generic_recv_pre_select(s, t); + if (ret <= 0) + return; + } para_fd_set(purd->fd, &s->rfds, &s->max_fileno); } @@ -59,7 +66,7 @@ static int add_rn_output(struct receiver_node *rn, char *buf, size_t len) return 1; } -static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) +static void udp_recv_post_select_nobtr(__a_unused struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); struct private_udp_recv_data *purd = rn->private_data; @@ -94,6 +101,54 @@ success: t->error = 1; } +#define UDP_RECV_READ_BUF_SIZE 1500 + +static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) +{ + struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct private_udp_recv_data *purd = rn->private_data; + struct btr_node *btrn = rn->btrn; + int ret; + char *buf = NULL; + size_t packet_size; + + t->error = 0; + ret = btr_node_status(btrn, 0, BTR_NT_ROOT); + if (ret < 0) + goto err; + if (ret == 0) + return; + if (!FD_ISSET(purd->fd, &s->rfds)) + return; + buf = para_malloc(UDP_RECV_READ_BUF_SIZE); + ret = recv_bin_buffer(purd->fd, buf, UDP_RECV_READ_BUF_SIZE); + if (ret == 0) + ret = -E_RECV_EOF; + if (ret < 0) + goto err; + packet_size = ret; + if (packet_size >= FEC_EOF_PACKET_LEN) { + if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) { + ret = -E_RECV_EOF; + goto err; + } + } + btr_add_output(buf, ret, btrn); + return; +err: + free(buf); + btr_remove_node(btrn); + t->error = ret; +} + +static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) +{ + struct receiver_node *rn = container_of(t, struct receiver_node, task); + if (rn->btrn) + return udp_recv_post_select_btr(s, t); + udp_recv_post_select_nobtr(s, t); +} + static void udp_shutdown(void) { return; @@ -107,6 +162,7 @@ static void udp_recv_close(struct receiver_node *rn) close(purd->fd); free(rn->private_data); free(rn->buf); + udp_recv_cmdline_parser_free(rn->conf); } static void *udp_recv_parse_config(int argc, char **argv)