2 * Copyright (C) 2005-2009 Andre Noll <maan@systemlinux.org>
4 * Licensed under the GPL v2. For licencing details see COPYING.
6 /** \file udp_recv.c Paraslash's udp receiver */
12 #include "portable_io.h"
13 #include "udp_header.h"
18 #include "udp_recv.cmdline.h"
24 /** The size of the receiver node buffer. */
25 #define UDP_RECV_CHUNK_SIZE (128 * 1024)
28 * Data specific to the udp receiver.
30 * \sa \ref receiver, \ref receiver_node.
32 struct private_udp_recv_data {
34 * Whether a header was received.
36 * A flag indicating whether this receiver already received a packet
37 * which contains the audio file header.
39 * This flag has no effect if the audio stream indicates that no extra
40 * headers will be sent (mp3, aac). Otherwise, all data packets are
41 * dropped until the header is received.
44 /** The socket file descriptor. */
46 /** Non-zero on short reads. */
48 /** Copied from the first audio header received. */
52 static void udp_recv_pre_select(struct sched *s, struct task *t)
54 struct receiver_node *rn = container_of(t, struct receiver_node, task);
55 struct private_udp_recv_data *purd = rn->private_data;
57 para_fd_set(purd->fd, &s->rfds, &s->max_fileno);
60 static int enough_space(size_t nbytes, size_t loaded)
62 return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
66 * Perform some sanity checks on an udp audio file header.
68 * return: negative on error, 0: discard data, 1: use data
70 static int examine_audio_header(struct private_udp_recv_data *purd,
71 struct udp_audio_header *uah, size_t packet_size)
73 /* payload_len includes header */
74 if (uah->payload_len < uah->header_len)
75 return -E_UDP_BAD_HEADER;
76 switch (uah->packet_type) {
80 purd->have_header = 1;
83 if (uah->header_len) /* header in no-header packet */
84 return -E_UDP_BAD_HEADER;
86 case UDP_HEADER_PACKET:
87 if (!uah->header_len) /** no header in header packet */
88 return -E_UDP_BAD_HEADER;
90 default: /* bad packet type */
91 return -E_UDP_BAD_HEADER;
93 /* check stream type */
94 if (uah->stream_type != UDP_PLAIN_STREAM &&
95 uah->stream_type != UDP_HEADER_STREAM)
96 return -E_UDP_BAD_STREAM_TYPE;
97 if (purd->stream_type == UDP_UNKNOWN_STREAM)
98 purd->stream_type = uah->stream_type;
99 /* stream type must not change */
100 if (uah->stream_type != purd->stream_type)
101 return -E_UDP_BAD_STREAM_TYPE;
102 if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
103 /* can't use the data, wait for header packet */
105 if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
106 /* we read only a part of the package */
107 purd->need_more = uah->payload_len
108 + UDP_AUDIO_HEADER_LEN - packet_size;
112 static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
116 if (!enough_space(len, rn->loaded))
117 return -E_UDP_OVERRUN;
118 memcpy(rn->buf + rn->loaded, buf, len);
123 static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
125 struct receiver_node *rn = container_of(t, struct receiver_node, task);
126 struct private_udp_recv_data *purd = rn->private_data;
128 char tmpbuf[UDP_RECV_CHUNK_SIZE];
132 struct udp_audio_header uah;
134 if (rn->output_error && *rn->output_error < 0) {
135 t->error = *rn->output_error;
138 if (!FD_ISSET(purd->fd, &s->rfds))
140 ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE);
142 if (is_errno(ret, EINTR) || is_errno(ret, EAGAIN))
147 t->error = -E_RECV_EOF;
154 if (!purd->need_more) {
155 ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
158 goto success; /* drop data */
160 num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
162 t->error = add_rn_output(rn, tmpbuf, num);
165 purd->need_more -= num;
166 if (packet_size <= num)
169 memmove(tmpbuf, tmpbuf + num, packet_size);
171 assert(!purd->need_more);
172 t->error = examine_audio_header(purd, &uah, packet_size);
175 data_len = uah.payload_len;
176 data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
177 if (uah.packet_type == UDP_HEADER_PACKET) {
178 if (purd->have_header) { /* skip header */
179 data_buf += uah.header_len;
180 data_len -= uah.header_len;
181 } else { /* only use the header */
182 purd->have_header = 1;
183 data_len = uah.header_len;
186 t->error = add_rn_output(rn, data_buf, data_len);
192 static void udp_shutdown(void)
197 static void udp_recv_close(struct receiver_node *rn)
199 struct private_udp_recv_data *purd = rn->private_data;
203 free(rn->private_data);
207 static void *udp_recv_parse_config(int argc, char **argv)
210 struct udp_recv_args_info *tmp =
211 para_calloc(sizeof(struct udp_recv_args_info));
213 ret = udp_recv_cmdline_parser(argc, argv, tmp)? -E_UDP_SYNTAX : 1;
220 static int udp_recv_open(struct receiver_node *rn)
222 struct private_udp_recv_data *purd;
223 struct udp_recv_args_info *c = rn->conf;
226 rn->buf = para_calloc(UDP_RECV_CHUNK_SIZE);
227 rn->private_data = para_calloc(sizeof(struct private_udp_recv_data));
228 purd = rn->private_data;
229 ret = create_udp_recv_socket(c->host_arg, c->port_arg);
233 ret = mark_fd_nonblocking(purd->fd);
236 purd->stream_type = UDP_UNKNOWN_STREAM;
237 PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
238 c->port_arg, purd->fd);
241 free(rn->private_data);
247 * The init function of the udp receiver.
249 * \param r Pointer to the receiver struct to initialize.
251 * Initialize all function pointers of \a r.
253 void udp_recv_init(struct receiver *r)
255 struct udp_recv_args_info dummy;
257 udp_recv_cmdline_parser_init(&dummy);
258 r->shutdown = udp_shutdown;
259 r->open = udp_recv_open;
260 r->close = udp_recv_close;
261 r->pre_select = udp_recv_pre_select;
262 r->post_select = udp_recv_post_select;
263 r->parse_config = udp_recv_parse_config;
264 r->help = (struct ggo_help) {
265 .short_help = udp_recv_args_info_help,
266 .detailed_help = udp_recv_args_info_detailed_help