Major udp sender/receiver improvements.
authorAndre Noll <maan@systemlinux.org>
Sun, 11 Jan 2009 22:18:04 +0000 (23:18 +0100)
committerAndre Noll <maan@systemlinux.org>
Sun, 11 Jan 2009 22:18:04 +0000 (23:18 +0100)
- Reserve some bytes in the udp header for future extensions.
- Simplify udp audio header reading/writing.
- Handle short reads in udp_recv.
- Extended sanity checks for the udp audio header.
- Fix a memory leak in the error path of udp_recv_open().

error.h
udp_header.h
udp_recv.c
udp_send.c

diff --git a/error.h b/error.h
index 347327a..897aeb2 100644 (file)
--- a/error.h
+++ b/error.h
@@ -208,11 +208,10 @@ extern const char **para_errlist[];
 
 
 #define UDP_RECV_ERRORS \
-       PARA_ERROR(UDP_NO_MAGIC, "not enough magic"), \
        PARA_ERROR(UDP_SYNTAX, "udp_recv syntax error"), \
-       PARA_ERROR(UDP_SHORT_PACKET, "udp data packet too short"), \
-       PARA_ERROR(INVALID_HEADER, "invalid header packet"), \
-       PARA_ERROR(OVERRUN, "output buffer overrun"), \
+       PARA_ERROR(UDP_BAD_HEADER, "invalid udp audio header"), \
+       PARA_ERROR(UDP_OVERRUN, "output buffer overrun"), \
+       PARA_ERROR(UDP_BAD_STREAM_TYPE, "invalid stream type"), \
 
 
 #define HTTP_RECV_ERRORS \
index 24f6bb8..83157e6 100644 (file)
  * this header, the type of the current audio stream and the * type of this
  * data chunk is coded.
  */
-#define UDP_AUDIO_HEADER_LEN 8
+#define UDP_AUDIO_HEADER_LEN 16
 
 /** The possible stream types. */
 enum udp_stream_type {
        /** Used for mp3 and aac streams. */
        UDP_PLAIN_STREAM,
-       /* Ogg vorbis streams. */
-       UDP_HEADER_STREAM
+       /** Ogg vorbis streams. */
+       UDP_HEADER_STREAM,
+       /** stream type not yet known. */
+       UDP_UNKNOWN_STREAM
 };
 
 /** The possible packet types. */
@@ -32,106 +34,66 @@ enum udp_audio_packet_type {
        /** Combined header/data packet (ogg only). */
        UDP_HEADER_PACKET,
        /** Packet contains only audio file data. */
-       UDP_DATA_PACKET
+       UDP_DATA_PACKET,
+       /** Invalid packet type. */
+       UDP_UNKNOWN_PACKET
+};
+
+/** The contents of an udp audio header. */
+struct udp_audio_header {
+       /** see \ref udp_stream_type. */
+       uint8_t stream_type;
+       /** see \ref udp_audio_packet_type. */
+       uint8_t packet_type;
+       /** Non-zero only for header packets. */
+       uint16_t header_len;
+       /** Length of header plus audio file data. */
+       uint16_t payload_len;
 };
 
 /**
- * Write the magic bytes to the beginning of a buffer.
+ * Write a struct udp_audio_header to a buffer.
+ *
+ * \param buf The buffer to write to.
+ * \param h The audio header to write.
+ *
+ * Used by the udp sender.
  *
- * \param buf The buffer.
  */
-_static_inline_ void udp_write_magic(char *buf)
+_static_inline_ void write_udp_audio_header(char *buf, struct udp_audio_header *h)
 {
        memcpy(buf, "UDPM", 4);
+       write_u8(buf + 4, h->stream_type);
+       write_u8(buf + 5, h->packet_type);
+       write_u16(buf + 6, h->header_len);
+       write_u16(buf + 8, h->payload_len);
+       memset(buf + 10, 0, 6);
 }
 
 /**
- * Check whether this buffer contains the magic bytes.
+ * Used by the udp receiver to read a struct udp_audio_header from a buffer.
  *
- * \param buf The buffer.
- * \param len The number of bytes of \a buf.
+ * \param buf The buffer to read from.
+ * \param len The length of \a buf.
+ * \param h Result pointer.
  *
- * \return Positive if \a buf contains the magic bytes,
- * -1 otherwise.
+ * \return 1 if \a buf contains a valid udp audio header, -1 else.
  */
-_static_inline_ int udp_check_magic(char *buf, size_t len)
+_static_inline_ int read_udp_audio_header(char *buf, size_t len,
+               struct udp_audio_header *h)
 {
        if (len < 4)
-               return -1;
+               goto err;
        if (memcmp(buf, "UDPM", 4))
-               return -1;
+               goto err;
+       h->stream_type = read_u8(buf + 4);
+       h->packet_type = read_u8(buf + 5);
+       h->header_len = read_u16(buf + 6);
+       h->payload_len = read_u16(buf + 8);
        return 1;
-}
-
-/**
- * Write the type of the audio stream to a buffer.
- *
- * \param buf The buffer.
- * \param type The type to be written.
- *
- * \sa \ref udp_stream_type.
- */
-_static_inline_ void udp_write_stream_type(char *buf, uint8_t type)
-{
-       write_u8(buf + 4, type);
-}
-
-/**
- * Read the type of the audio stream from a buffer.
- *
- * \param buf The buffer.
- *
- * \return Either UDP_PLAIN_STREAM or UDP_HEADER_STREAM.
- * \sa \ref udp_stream_type.
- */
-_static_inline_ uint8_t udp_read_stream_type(char *buf)
-{
-       return read_u8(buf + 4);
-}
-
-/**
- * Write the type of this packet to a buffer.
- *
- * \param buf The buffer.
- * \param type The type to be written.
- *
- * \sa \ref udp_audio_packet_type.
- */
-_static_inline_ void udp_write_packet_type(char *buf, uint8_t type)
-{
-       write_u8(buf + 5, type);
-}
-
-/**
- * Read the type of this buffer.
- *
- * \param buf The buffer.
- *
- * \return The packet type, see \ref udp_stream_type.
- */
-_static_inline_ uint8_t udp_read_packet_type(char *buf)
-{
-       return read_u8(buf + 5);
-}
-
-/**
- * Write the length of the header (non-zero only for ogg streams).
- *
- * \param buf The buffer.
- * \param len The length of the header in bytes.
- */
-_static_inline_ void udp_write_header_len(char *buf, uint16_t len)
-{
-       write_u16(buf + 6, len);
-}
-
-/**
- * Read the length of the header.
- *
- * \param buf The buffer.
- * \return The header length in bytes.
- */
-_static_inline_ uint16_t udp_read_header_len(char *buf)
-{
-       return read_u16(buf + 6);
+err:
+       h->stream_type = UDP_UNKNOWN_STREAM;
+       h->packet_type = UDP_UNKNOWN_PACKET;
+       h->header_len = h->payload_len = 0;
+       return -1;
 }
index 66dc36b..bdc2541 100644 (file)
@@ -8,6 +8,7 @@
 #include <dirent.h>
 
 #include "para.h"
+#include "error.h"
 #include "portable_io.h"
 #include "udp_header.h"
 #include "list.h"
@@ -15,7 +16,6 @@
 #include "ggo.h"
 #include "recv.h"
 #include "udp_recv.cmdline.h"
-#include "error.h"
 #include "audiod.h"
 #include "string.h"
 #include "net.h"
@@ -43,6 +43,10 @@ struct private_udp_recv_data {
        int have_header;
        /** The socket file descriptor. */
        int fd;
+       /** Non-zero on short reads. */
+       uint16_t need_more;
+       /** Copied from the first audio header received. */
+       uint16_t stream_type;
 };
 
 static void udp_recv_pre_select(struct sched *s, struct task *t)
@@ -58,14 +62,74 @@ static int enough_space(size_t nbytes, size_t loaded)
        return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
 }
 
+/*
+ * Perform some sanity checks on an udp audio file header.
+ *
+ * return: negative on error, 0: discard data, 1: use data
+ */
+static int examine_audio_header(struct private_udp_recv_data *purd,
+               struct udp_audio_header *uah, size_t packet_size)
+{
+       /* payload_len includes header */
+       if (uah->payload_len < uah->header_len)
+               return -E_UDP_BAD_HEADER;
+       switch (uah->packet_type) {
+       case UDP_EOF_PACKET:
+               return -E_RECV_EOF;
+       case UDP_BOF_PACKET:
+               purd->have_header = 1;
+               /* fall through */
+       case UDP_DATA_PACKET:
+               if (uah->header_len) /* header in no-header packet */
+                       return -E_UDP_BAD_HEADER;
+               break;
+       case UDP_HEADER_PACKET:
+               if (!uah->header_len) /** no header in header packet */
+                       return -E_UDP_BAD_HEADER;
+               break;
+       default: /* bad packet type */
+               return -E_UDP_BAD_HEADER;
+       }
+       /* check stream type */
+       if (uah->stream_type != UDP_PLAIN_STREAM &&
+                       uah->stream_type != UDP_HEADER_STREAM)
+               return -E_UDP_BAD_STREAM_TYPE;
+       if (purd->stream_type == UDP_UNKNOWN_STREAM)
+               purd->stream_type = uah->stream_type;
+       /* stream type must not change */
+       if (uah->stream_type != purd->stream_type)
+               return -E_UDP_BAD_STREAM_TYPE;
+       if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
+               /* can't use the data, wait for header packet */
+               return 0;
+       if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
+               /* we read only a part of the package */
+               purd->need_more = uah->payload_len
+                       + UDP_AUDIO_HEADER_LEN - packet_size;
+       return 1;
+}
+
+static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
+{
+       if (!len)
+               return 1;
+       if (!enough_space(len, rn->loaded))
+               return -E_UDP_OVERRUN;
+       memcpy(rn->buf + rn->loaded, buf, len);
+       rn->loaded += len;
+       return 1;
+}
+
 static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
 {
        struct receiver_node *rn = container_of(t, struct receiver_node, task);
        struct private_udp_recv_data *purd = rn->private_data;
        int ret;
        char tmpbuf[UDP_RECV_CHUNK_SIZE];
+       uint16_t data_len;
+       char *data_buf;
        size_t packet_size;
-       uint8_t stream_type, packet_type;
+       struct udp_audio_header uah;
 
        if (rn->output_error && *rn->output_error < 0) {
                t->error = *rn->output_error;
@@ -84,71 +148,45 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
        if (!ret)
                return;
        packet_size = ret;
-       t->error = -E_UDP_SHORT_PACKET;
-       if (packet_size < UDP_AUDIO_HEADER_LEN)
-               return;
-       t->error = -E_UDP_NO_MAGIC;
-       if (udp_check_magic(tmpbuf, packet_size) < 0)
-               return;
-       stream_type = udp_read_stream_type(tmpbuf);
-       packet_type = udp_read_packet_type(tmpbuf);
-//     PARA_INFO_LOG("packet type: %d, stream type: %d,"
-//             " loaded: %u\n", packet_type,
-//             (unsigned) stream_type, rn->loaded);
-       switch (packet_type) {
-               uint16_t header_len, payload_len;
+       for (;;) {
+               uint16_t num;
 
-       case UDP_EOF_PACKET:
-               t->error = -E_RECV_EOF;
-               return;
-       case UDP_BOF_PACKET:
-               PARA_INFO_LOG("bof (%zu)\n", packet_size);
-               purd->have_header = 1;
-               /* fall through */
-       case UDP_DATA_PACKET:
-               if (!purd->have_header && stream_type == UDP_HEADER_STREAM)
-                       /* can't use the data, wait for header */
-                       goto success;
-               payload_len = packet_size - UDP_AUDIO_HEADER_LEN;
-               if (!payload_len)
-                       goto success;
-               t->error = -E_OVERRUN;
-               if (!enough_space(payload_len, rn->loaded))
-                       return;
-               memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN,
-                       payload_len);
-               rn->loaded += payload_len;
-               goto success;
-       case UDP_HEADER_PACKET:
-               header_len = udp_read_header_len(tmpbuf);
-               if (header_len + UDP_AUDIO_HEADER_LEN > packet_size) {
-                       t->error = -E_INVALID_HEADER;
-                       return;
+               if (!purd->need_more) {
+                       ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
+                       if (ret >= 0)
+                               break;
+                       goto success; /* drop data */
                }
-//             PARA_DEBUG_LOG("header packet (%zu bytes), header len: %d\n",
-//                     packet_size, header_len);
-               if (!purd->have_header) {
-                       t->error = -E_OVERRUN;
-                       if (!enough_space(header_len, rn->loaded))
-                               return;
-                       purd->have_header = 1;
-                       rn->loaded = header_len;
-                       memcpy(rn->buf, tmpbuf + UDP_AUDIO_HEADER_LEN,
-                               rn->loaded);
+               num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
+               assert(num > 0);
+               t->error = add_rn_output(rn, tmpbuf, num);
+               if (t->error < 0)
+                       return;
+               purd->need_more -= num;
+               if (packet_size <= num)
                        goto success;
+               packet_size -= num;
+               memmove(tmpbuf, tmpbuf + num, packet_size);
+       }
+       assert(!purd->need_more);
+       t->error = examine_audio_header(purd, &uah, packet_size);
+       if (t->error <= 0)
+               return;
+       data_len = uah.payload_len;
+       data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
+       if (uah.packet_type == UDP_HEADER_PACKET) {
+               if (purd->have_header) { /* skip header */
+                       data_buf += uah.header_len;
+                       data_len -= uah.header_len;
+               } else { /* only use the header */
+                       purd->have_header = 1;
+                       data_len = uah.header_len;
                }
-               payload_len = packet_size - UDP_AUDIO_HEADER_LEN - header_len;
-               if (!payload_len)
-                       goto success;
-               t->error = -E_OVERRUN;
-               if (!enough_space(payload_len, rn->loaded))
-                       return;
-               memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN +
-                       header_len, payload_len);
-               rn->loaded += payload_len;
        }
+       t->error = add_rn_output(rn, data_buf, data_len);
+       return;
 success:
-       t->error = 0;
+       t->error = 1;
 }
 
 static void udp_shutdown(void)
@@ -190,14 +228,19 @@ static int udp_recv_open(struct receiver_node *rn)
        purd = rn->private_data;
        ret = create_udp_recv_socket(c->host_arg, c->port_arg);
        if (ret < 0)
-               return ret;
+               goto err;
        purd->fd = ret;
        ret = mark_fd_nonblocking(purd->fd);
        if (ret < 0)
-               return ret;
+               goto err;
+       purd->stream_type = UDP_UNKNOWN_STREAM;
        PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
                c->port_arg, purd->fd);
        return purd->fd;
+err:
+       free(rn->private_data);
+       free(rn->buf);
+       return ret;
 }
 
 /**
index e83239d..b0f55bc 100644 (file)
@@ -136,9 +136,12 @@ static void udp_shutdown_targets(void)
 {
        char buf[UDP_AUDIO_HEADER_LEN];
        struct udp_target *ut, *tmp;
+       struct udp_audio_header uah = {
+               .stream_type = UDP_UNKNOWN_STREAM,
+               .packet_type = UDP_EOF_PACKET,
+       };
 
-       udp_write_packet_type(buf, UDP_EOF_PACKET);
-       udp_write_magic(buf);
+       write_udp_audio_header(buf, &uah);
        list_for_each_entry_safe(ut, tmp, &targets, node) {
                if (ut->fd < 0)
                        continue;
@@ -165,11 +168,10 @@ static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunk
                const char *buf, size_t len, const char *header_buf,
                size_t header_len)
 {
-       size_t sendbuf_len;
-       uint8_t packet_type = UDP_DATA_PACKET;
        char *sendbuf;
+       size_t sendbuf_len;
        struct timeval *chunk_tv;
-       uint8_t stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM;
+       struct udp_audio_header uah;
 
 //     PARA_NOTICE_LOG("len: %zd, header_len: %zd\n", len, header_len);
        if (sender_status != SENDER_ON)
@@ -181,22 +183,22 @@ static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunk
                return;
        if (list_empty(&targets))
                return;
-       if (!need_extra_header(current_chunk))
-               header_len = 0;
+       uah.stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM;
+       uah.header_len = need_extra_header(current_chunk)? header_len : 0;
        if (!current_chunk)
-               packet_type = UDP_BOF_PACKET;
-       else if (header_len)
-               packet_type = UDP_HEADER_PACKET;
-       sendbuf_len = UDP_AUDIO_HEADER_LEN + header_len + len;
+               uah.packet_type = UDP_BOF_PACKET;
+       else if (uah.header_len)
+               uah.packet_type = UDP_HEADER_PACKET;
+       else
+               uah.packet_type = UDP_DATA_PACKET;
+       uah.payload_len = uah.header_len + len;
+       sendbuf_len = UDP_AUDIO_HEADER_LEN + uah.payload_len;
        sendbuf = para_malloc(sendbuf_len);
-       udp_write_magic(sendbuf);
-       udp_write_stream_type(sendbuf, stream_type);
-       udp_write_packet_type(sendbuf, packet_type);
-       udp_write_header_len(sendbuf, header_len);
-       if (header_len)
+       write_udp_audio_header(sendbuf, &uah);
+       if (uah.header_len)
                memcpy(sendbuf + UDP_AUDIO_HEADER_LEN, header_buf,
-                       header_len);
-       memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + header_len, buf, len);
+                       uah.header_len);
+       memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + uah.header_len, buf, len);
        udp_send_buf(sendbuf, sendbuf_len);
        free(sendbuf);
 }