From: Andre Noll Date: Sat, 10 Jan 2009 02:48:55 +0000 (+0100) Subject: udp sender/receiver improvements. X-Git-Tag: v0.3.4~75^2~32 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=9692b4241bc316665bd5b6810ea8af83057a0a89;hp=6bdac07456cb5872f824028912d1049883a9c21f udp sender/receiver improvements. --- diff --git a/udp_header.h b/udp_header.h index 55518057..24f6bb87 100644 --- a/udp_header.h +++ b/udp_header.h @@ -25,10 +25,10 @@ enum udp_stream_type { /** The possible packet types. */ enum udp_audio_packet_type { - /** End of file. */ - UDP_EOF_PACKET, /** Beginning of file. */ UDP_BOF_PACKET, + /** End of file. */ + UDP_EOF_PACKET, /** Combined header/data packet (ogg only). */ UDP_HEADER_PACKET, /** Packet contains only audio file data. */ @@ -107,8 +107,7 @@ _static_inline_ void udp_write_packet_type(char *buf, uint8_t type) * * \param buf The buffer. * - * \return One of the four differnt packet types. - * \sa \ref udp_stream_type. + * \return The packet type, see \ref udp_stream_type. */ _static_inline_ uint8_t udp_read_packet_type(char *buf) { diff --git a/udp_recv.c b/udp_recv.c index b3ea83c6..66dc36b9 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -53,9 +53,9 @@ static void udp_recv_pre_select(struct sched *s, struct task *t) para_fd_set(purd->fd, &s->rfds, &s->max_fileno); } -static int enough_space(size_t packet_size, size_t loaded) +static int enough_space(size_t nbytes, size_t loaded) { - return packet_size + loaded < UDP_RECV_CHUNK_SIZE + UDP_AUDIO_HEADER_LEN; + return nbytes + loaded < UDP_RECV_CHUNK_SIZE; } static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) @@ -75,20 +75,21 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) return; ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE); if (ret < 0) { - t->error = 0; - if (errno != EINTR && errno != EAGAIN) - t->error = -ERRNO_TO_PARA_ERROR(errno); + if (is_errno(ret, EINTR) || is_errno(ret, EAGAIN)) + goto success; + t->error = ret; return; } + t->error = -E_RECV_EOF; + if (!ret) + return; packet_size = ret; - if (packet_size < UDP_AUDIO_HEADER_LEN) { - t->error = -E_UDP_SHORT_PACKET; /* FIXME: We shouldn't fail here */ + t->error = -E_UDP_SHORT_PACKET; + if (packet_size < UDP_AUDIO_HEADER_LEN) return; - } - if (udp_check_magic(tmpbuf, packet_size) < 0) { - t->error = -E_UDP_NO_MAGIC; + t->error = -E_UDP_NO_MAGIC; + if (udp_check_magic(tmpbuf, packet_size) < 0) return; - } stream_type = udp_read_stream_type(tmpbuf); packet_type = udp_read_packet_type(tmpbuf); // PARA_INFO_LOG("packet type: %d, stream type: %d," @@ -96,6 +97,7 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) // (unsigned) stream_type, rn->loaded); switch (packet_type) { uint16_t header_len, payload_len; + case UDP_EOF_PACKET: t->error = -E_RECV_EOF; return; @@ -105,23 +107,19 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) /* fall through */ case UDP_DATA_PACKET: if (!purd->have_header && stream_type == UDP_HEADER_STREAM) - /* can't use the data, wait for header */ + /* can't use the data, wait for header */ goto success; - if (!enough_space(packet_size, rn->loaded)) { - t->error = -E_OVERRUN; + payload_len = packet_size - UDP_AUDIO_HEADER_LEN; + if (!payload_len) + goto success; + t->error = -E_OVERRUN; + if (!enough_space(payload_len, rn->loaded)) return; - } - if (packet_size > UDP_AUDIO_HEADER_LEN) { - memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN, - packet_size - UDP_AUDIO_HEADER_LEN); - rn->loaded += packet_size - UDP_AUDIO_HEADER_LEN; - } + memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN, + payload_len); + rn->loaded += payload_len; goto success; case UDP_HEADER_PACKET: - if (!enough_space(packet_size, rn->loaded)) { - t->error = -E_OVERRUN; - return; - } header_len = udp_read_header_len(tmpbuf); if (header_len + UDP_AUDIO_HEADER_LEN > packet_size) { t->error = -E_INVALID_HEADER; @@ -130,21 +128,23 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) // PARA_DEBUG_LOG("header packet (%zu bytes), header len: %d\n", // packet_size, header_len); if (!purd->have_header) { + t->error = -E_OVERRUN; + if (!enough_space(header_len, rn->loaded)) + return; purd->have_header = 1; rn->loaded = header_len; memcpy(rn->buf, tmpbuf + UDP_AUDIO_HEADER_LEN, rn->loaded); -// sleep(1); goto success; } payload_len = packet_size - UDP_AUDIO_HEADER_LEN - header_len; - if (rn->loaded + payload_len > UDP_RECV_CHUNK_SIZE) { - t->error = -E_OVERRUN; + if (!payload_len) + goto success; + t->error = -E_OVERRUN; + if (!enough_space(payload_len, rn->loaded)) return; - } - if (payload_len) - memcpy(rn->buf + rn->loaded, tmpbuf - + (packet_size - payload_len), payload_len); + memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN + + header_len, payload_len); rn->loaded += payload_len; } success: diff --git a/udp_send.c b/udp_send.c index b5a0d37d..61cab430 100644 --- a/udp_send.c +++ b/udp_send.c @@ -65,17 +65,26 @@ static void udp_send_buf(char *buf, size_t len) if (ut->fd < 0) continue; ret = write_all(ut->fd, buf, &written); - if (ret < 0) + if (ret < 0) /* TODO: Use chunk queueing */ return udp_delete_target(ut, "send error"); if (written != len) PARA_WARNING_LOG("short write %zu/%zu\n", written, len); } } -static void udp_init_session(struct udp_target *ut) +static int udp_init_session(struct udp_target *ut) { + int ret; + + if (ut->fd >= 0) /* nothing to do */ + return 0; PARA_NOTICE_LOG("sending to udp %s:%d\n", TARGET_ADDR(ut), ut->port); - ut->fd = create_udp_send_socket(TARGET_ADDR(ut), ut->port, 10); + /* TODO: Make ttl configurable. */ + ret = create_udp_send_socket(TARGET_ADDR(ut), ut->port, 10); + if (ret < 0) + return ret; + ut->fd = ret; + return mark_fd_nonblocking(ut->fd); } static void udp_shutdown_targets(void) @@ -84,12 +93,13 @@ static void udp_shutdown_targets(void) struct udp_target *ut, *tmp; udp_write_packet_type(buf, UDP_EOF_PACKET); + udp_write_magic(buf); list_for_each_entry_safe(ut, tmp, &targets, node) { if (ut->fd < 0) continue; - PARA_INFO_LOG("sending eof to udp target %s:%d\n", - TARGET_ADDR(ut), ut->port); write(ut->fd, buf, UDP_AUDIO_HEADER_LEN); + close(ut->fd); + ut->fd = -1; } } @@ -113,7 +123,8 @@ static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunk { struct udp_target *ut, *tmp; size_t sendbuf_len; - int packet_type = UDP_DATA_PACKET; + uint8_t packet_type = UDP_DATA_PACKET; + int ret; char *sendbuf; struct timeval *chunk_tv; uint8_t stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM; @@ -130,8 +141,9 @@ static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunk if (list_empty(&targets)) return; list_for_each_entry_safe(ut, tmp, &targets, node) { - if (ut->fd < 0) - udp_init_session(ut); + ret = udp_init_session(ut); + if (ret < 0) + udp_delete_target(ut, para_strerror(-ret)); } if (!need_extra_header(current_chunk)) header_len = 0;