X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=udp_recv.c;h=bdc25411ef7a5c17cdde8509ab9e6e8de01db10c;hp=b2563726943b99c77489e3141b929e7f83fad93d;hb=35993f0cd49206666262cfdd2be89af41ec28a1d;hpb=141b0fd36edcad22ee07c65b101e90064d8567b6 diff --git a/udp_recv.c b/udp_recv.c index b2563726..bdc25411 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2005-2008 Andre Noll + * Copyright (C) 2005-2009 Andre Noll * * Licensed under the GPL v2. For licencing details see COPYING. */ @@ -8,6 +8,7 @@ #include #include "para.h" +#include "error.h" #include "portable_io.h" #include "udp_header.h" #include "list.h" @@ -15,7 +16,6 @@ #include "ggo.h" #include "recv.h" #include "udp_recv.cmdline.h" -#include "error.h" #include "audiod.h" #include "string.h" #include "net.h" @@ -43,6 +43,10 @@ struct private_udp_recv_data { int have_header; /** The socket file descriptor. */ int fd; + /** Non-zero on short reads. */ + uint16_t need_more; + /** Copied from the first audio header received. */ + uint16_t stream_type; }; static void udp_recv_pre_select(struct sched *s, struct task *t) @@ -53,9 +57,67 @@ static void udp_recv_pre_select(struct sched *s, struct task *t) para_fd_set(purd->fd, &s->rfds, &s->max_fileno); } -static int enough_space(size_t packet_size, size_t loaded) +static int enough_space(size_t nbytes, size_t loaded) +{ + return nbytes + loaded < UDP_RECV_CHUNK_SIZE; +} + +/* + * Perform some sanity checks on an udp audio file header. + * + * return: negative on error, 0: discard data, 1: use data + */ +static int examine_audio_header(struct private_udp_recv_data *purd, + struct udp_audio_header *uah, size_t packet_size) +{ + /* payload_len includes header */ + if (uah->payload_len < uah->header_len) + return -E_UDP_BAD_HEADER; + switch (uah->packet_type) { + case UDP_EOF_PACKET: + return -E_RECV_EOF; + case UDP_BOF_PACKET: + purd->have_header = 1; + /* fall through */ + case UDP_DATA_PACKET: + if (uah->header_len) /* header in no-header packet */ + return -E_UDP_BAD_HEADER; + break; + case UDP_HEADER_PACKET: + if (!uah->header_len) /** no header in header packet */ + return -E_UDP_BAD_HEADER; + break; + default: /* bad packet type */ + return -E_UDP_BAD_HEADER; + } + /* check stream type */ + if (uah->stream_type != UDP_PLAIN_STREAM && + uah->stream_type != UDP_HEADER_STREAM) + return -E_UDP_BAD_STREAM_TYPE; + if (purd->stream_type == UDP_UNKNOWN_STREAM) + purd->stream_type = uah->stream_type; + /* stream type must not change */ + if (uah->stream_type != purd->stream_type) + return -E_UDP_BAD_STREAM_TYPE; + if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM) + /* can't use the data, wait for header packet */ + return 0; + if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN) + /* we read only a part of the package */ + purd->need_more = uah->payload_len + + UDP_AUDIO_HEADER_LEN - packet_size; + return 1; +} + +static int add_rn_output(struct receiver_node *rn, char *buf, size_t len) { - return packet_size + loaded < UDP_RECV_CHUNK_SIZE + UDP_AUDIO_HEADER_LEN; + if (!len) + return 1; + if (!enough_space(len, rn->loaded)) + return -E_UDP_OVERRUN; + memcpy(rn->buf + rn->loaded, buf, len); + rn->loaded += len; + return 1; } static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) @@ -64,8 +126,10 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) struct private_udp_recv_data *purd = rn->private_data; int ret; char tmpbuf[UDP_RECV_CHUNK_SIZE]; + uint16_t data_len; + char *data_buf; size_t packet_size; - uint8_t stream_type, packet_type; + struct udp_audio_header uah; if (rn->output_error && *rn->output_error < 0) { t->error = *rn->output_error; @@ -75,80 +139,54 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) return; ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE); if (ret < 0) { - t->error = 0; - if (errno != EINTR && errno != EAGAIN) - t->error = -ERRNO_TO_PARA_ERROR(errno); - return; - } - packet_size = ret; - if (packet_size < UDP_AUDIO_HEADER_LEN) { - t->error = -E_UDP_SHORT_PACKET; /* FIXME: We shouldn't fail here */ + if (is_errno(ret, EINTR) || is_errno(ret, EAGAIN)) + goto success; + t->error = ret; return; } - if (udp_check_magic(tmpbuf, packet_size) < 0) { - t->error = -E_UDP_NO_MAGIC; + t->error = -E_RECV_EOF; + if (!ret) return; - } - stream_type = udp_read_stream_type(tmpbuf); - packet_type = udp_read_packet_type(tmpbuf); -// PARA_INFO_LOG("packet type: %d, stream type: %d," -// " loaded: %u\n", packet_type, -// (unsigned) stream_type, rn->loaded); - switch (packet_type) { - uint16_t header_len, payload_len; - case UDP_EOF_PACKET: - t->error = -E_RECV_EOF; - return; - case UDP_BOF_PACKET: - PARA_INFO_LOG("bof (%zu)\n", packet_size); - purd->have_header = 1; - /* fall through */ - case UDP_DATA_PACKET: - if (!purd->have_header && stream_type == UDP_HEADER_STREAM) - /* can't use the data, wait for header */ - goto success; - if (!enough_space(packet_size, rn->loaded)) { - t->error = -E_OVERRUN; - return; - } - if (packet_size > UDP_AUDIO_HEADER_LEN) { - memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN, - packet_size - UDP_AUDIO_HEADER_LEN); - rn->loaded += packet_size - UDP_AUDIO_HEADER_LEN; - } - goto success; - case UDP_HEADER_PACKET: - if (!enough_space(packet_size, rn->loaded)) { - t->error = -E_OVERRUN; - return; + packet_size = ret; + for (;;) { + uint16_t num; + + if (!purd->need_more) { + ret = read_udp_audio_header(tmpbuf, packet_size, &uah); + if (ret >= 0) + break; + goto success; /* drop data */ } - header_len = udp_read_header_len(tmpbuf); - if (header_len + UDP_AUDIO_HEADER_LEN > packet_size) { - t->error = -E_INVALID_HEADER; + num = PARA_MIN(purd->need_more, (uint16_t)packet_size); + assert(num > 0); + t->error = add_rn_output(rn, tmpbuf, num); + if (t->error < 0) return; - } -// PARA_DEBUG_LOG("header packet (%zu bytes), header len: %d\n", -// packet_size, header_len); - if (!purd->have_header) { - purd->have_header = 1; - rn->loaded = header_len; - memcpy(rn->buf, tmpbuf + UDP_AUDIO_HEADER_LEN, - rn->loaded); -// sleep(1); + purd->need_more -= num; + if (packet_size <= num) goto success; + packet_size -= num; + memmove(tmpbuf, tmpbuf + num, packet_size); + } + assert(!purd->need_more); + t->error = examine_audio_header(purd, &uah, packet_size); + if (t->error <= 0) + return; + data_len = uah.payload_len; + data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN; + if (uah.packet_type == UDP_HEADER_PACKET) { + if (purd->have_header) { /* skip header */ + data_buf += uah.header_len; + data_len -= uah.header_len; + } else { /* only use the header */ + purd->have_header = 1; + data_len = uah.header_len; } - payload_len = packet_size - UDP_AUDIO_HEADER_LEN - header_len; - if (rn->loaded + payload_len > UDP_RECV_CHUNK_SIZE) { - t->error = -E_OVERRUN; - return; - } - if (payload_len) - memcpy(rn->buf + rn->loaded, tmpbuf - + (packet_size - payload_len), payload_len); - rn->loaded += payload_len; } + t->error = add_rn_output(rn, data_buf, data_len); + return; success: - t->error = 0; + t->error = 1; } static void udp_shutdown(void) @@ -190,14 +228,19 @@ static int udp_recv_open(struct receiver_node *rn) purd = rn->private_data; ret = create_udp_recv_socket(c->host_arg, c->port_arg); if (ret < 0) - return ret; + goto err; purd->fd = ret; ret = mark_fd_nonblocking(purd->fd); if (ret < 0) - return ret; + goto err; + purd->stream_type = UDP_UNKNOWN_STREAM; PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, c->port_arg, purd->fd); return purd->fd; +err: + free(rn->private_data); + free(rn->buf); + return ret; } /**