ccd769b0e6e1d36b996b87874f09190f5d2fbb7e
[paraslash.git] / udp_recv.c
1 /*
2 * Copyright (C) 2005-2009 Andre Noll <maan@systemlinux.org>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6 /** \file udp_recv.c Paraslash's udp receiver */
7
8 #include <dirent.h>
9
10 #include "para.h"
11 #include "error.h"
12 #include "portable_io.h"
13 #include "udp_header.h"
14 #include "list.h"
15 #include "sched.h"
16 #include "ggo.h"
17 #include "recv.h"
18 #include "udp_recv.cmdline.h"
19 #include "audiod.h"
20 #include "string.h"
21 #include "net.h"
22 #include "fd.h"
23
24 /** The size of the receiver node buffer. */
25 #define UDP_RECV_CHUNK_SIZE (128 * 1024)
26
27 /**
28 * Data specific to the udp receiver.
29 *
30 * \sa \ref receiver, \ref receiver_node.
31 */
32 struct private_udp_recv_data {
33 /**
34 * Whether a header was received.
35 *
36 * A flag indicating whether this receiver already received a packet
37 * which contains the audio file header.
38 *
39 * This flag has no effect if the audio stream indicates that no extra
40 * headers will be sent (mp3, aac). Otherwise, all data packets are
41 * dropped until the header is received.
42 */
43 int have_header;
44 /** The socket file descriptor. */
45 int fd;
46 /** Non-zero on short reads. */
47 uint16_t need_more;
48 /** Copied from the first audio header received. */
49 uint16_t stream_type;
50 };
51
52 static void udp_recv_pre_select(struct sched *s, struct task *t)
53 {
54 struct receiver_node *rn = container_of(t, struct receiver_node, task);
55 struct private_udp_recv_data *purd = rn->private_data;
56
57 para_fd_set(purd->fd, &s->rfds, &s->max_fileno);
58 }
59
60 static int enough_space(size_t nbytes, size_t loaded)
61 {
62 return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
63 }
64
65 /*
66 * Perform some sanity checks on an udp audio file header.
67 *
68 * return: negative on error, 0: discard data, 1: use data
69 */
70 static int examine_audio_header(struct private_udp_recv_data *purd,
71 struct udp_audio_header *uah, size_t packet_size)
72 {
73 /* payload_len includes header */
74 if (uah->payload_len < uah->header_len)
75 return -E_UDP_BAD_HEADER;
76 switch (uah->packet_type) {
77 case UDP_EOF_PACKET:
78 return -E_RECV_EOF;
79 case UDP_BOF_PACKET:
80 purd->have_header = 1;
81 /* fall through */
82 case UDP_DATA_PACKET:
83 if (uah->header_len) /* header in no-header packet */
84 return -E_UDP_BAD_HEADER;
85 break;
86 case UDP_HEADER_PACKET:
87 if (!uah->header_len) /** no header in header packet */
88 return -E_UDP_BAD_HEADER;
89 break;
90 default: /* bad packet type */
91 return -E_UDP_BAD_HEADER;
92 }
93 /* check stream type */
94 if (uah->stream_type != UDP_PLAIN_STREAM &&
95 uah->stream_type != UDP_HEADER_STREAM)
96 return -E_UDP_BAD_STREAM_TYPE;
97 if (purd->stream_type == UDP_UNKNOWN_STREAM)
98 purd->stream_type = uah->stream_type;
99 /* stream type must not change */
100 if (uah->stream_type != purd->stream_type)
101 return -E_UDP_BAD_STREAM_TYPE;
102 if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
103 /* can't use the data, wait for header packet */
104 return 0;
105 if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
106 /* we read only a part of the package */
107 purd->need_more = uah->payload_len
108 + UDP_AUDIO_HEADER_LEN - packet_size;
109 return 1;
110 }
111
112 static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
113 {
114 if (!len)
115 return 1;
116 if (!enough_space(len, rn->loaded))
117 return -E_UDP_OVERRUN;
118 memcpy(rn->buf + rn->loaded, buf, len);
119 rn->loaded += len;
120 return 1;
121 }
122
123 static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
124 {
125 struct receiver_node *rn = container_of(t, struct receiver_node, task);
126 struct private_udp_recv_data *purd = rn->private_data;
127 int ret;
128 char tmpbuf[UDP_RECV_CHUNK_SIZE];
129 uint16_t data_len;
130 char *data_buf;
131 size_t packet_size;
132 struct udp_audio_header uah;
133
134 if (rn->output_error && *rn->output_error < 0) {
135 t->error = *rn->output_error;
136 return;
137 }
138 if (!FD_ISSET(purd->fd, &s->rfds))
139 return;
140 ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE);
141 if (ret < 0) {
142 if (is_errno(ret, EINTR) || is_errno(ret, EAGAIN))
143 goto success;
144 t->error = ret;
145 return;
146 }
147 t->error = -E_RECV_EOF;
148 if (!ret)
149 return;
150 packet_size = ret;
151 for (;;) {
152 uint16_t num;
153
154 if (!purd->need_more) {
155 ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
156 if (ret >= 0)
157 break;
158 goto success; /* drop data */
159 }
160 num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
161 assert(num > 0);
162 t->error = add_rn_output(rn, tmpbuf, num);
163 if (t->error < 0)
164 return;
165 purd->need_more -= num;
166 if (packet_size <= num)
167 goto success;
168 packet_size -= num;
169 memmove(tmpbuf, tmpbuf + num, packet_size);
170 }
171 assert(!purd->need_more);
172 t->error = examine_audio_header(purd, &uah, packet_size);
173 if (t->error <= 0)
174 return;
175 data_len = uah.payload_len;
176 data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
177 if (uah.packet_type == UDP_HEADER_PACKET) {
178 if (purd->have_header) { /* skip header */
179 data_buf += uah.header_len;
180 data_len -= uah.header_len;
181 } else { /* only use the header */
182 purd->have_header = 1;
183 data_len = uah.header_len;
184 }
185 }
186 t->error = add_rn_output(rn, data_buf, data_len);
187 return;
188 success:
189 t->error = 1;
190 }
191
192 static void udp_shutdown(void)
193 {
194 return;
195 }
196
197 static void udp_recv_close(struct receiver_node *rn)
198 {
199 struct private_udp_recv_data *purd = rn->private_data;
200
201 if (purd->fd >= 0)
202 close(purd->fd);
203 free(rn->private_data);
204 free(rn->buf);
205 }
206
207 static void *udp_recv_parse_config(int argc, char **argv)
208 {
209 int ret;
210 struct udp_recv_args_info *tmp =
211 para_calloc(sizeof(struct udp_recv_args_info));
212
213 ret = udp_recv_cmdline_parser(argc, argv, tmp)? -E_UDP_SYNTAX : 1;
214 if (ret >= 0)
215 return tmp;
216 free(tmp);
217 return NULL;
218 }
219
220 /*
221 * Perform AF-independent joining of multicast receive addresses.
222 *
223 * \param fd Bound socket descriptor.
224 *
225 * \return Zero if okay, negative on error.
226 */
227 static int mcast_receiver_setup(int fd)
228 {
229 struct sockaddr_storage ss;
230 socklen_t sslen = sizeof(ss);
231
232 if (getsockname(fd, (struct sockaddr *)&ss, &sslen) < 0)
233 goto err;
234
235 switch (ss.ss_family) {
236 case AF_INET:
237 if (IN_MULTICAST(htonl(((struct sockaddr_in *)&ss)->sin_addr.s_addr))) {
238 struct ip_mreq m4;
239
240 memset(&m4, 0, sizeof(m4));
241 m4.imr_multiaddr = ((struct sockaddr_in *)&ss)->sin_addr;
242 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &m4, sizeof(m4)) < 0)
243 break;
244 }
245 return 0;
246 case AF_INET6:
247 if (IN6_IS_ADDR_MULTICAST(&((struct sockaddr_in6 *)&ss)->sin6_addr)) {
248 struct ipv6_mreq m6;
249
250 memset(&m6, 0, sizeof(m6));
251 memcpy(&m6.ipv6mr_multiaddr, &((struct sockaddr_in6 *)&ss)->sin6_addr, 16);
252 if (setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &m6, sizeof(m6)) < 0)
253 break;
254 }
255 return 0;
256 default:
257 PARA_ERROR_LOG("address family %d not supported", ss.ss_family);
258 return -E_ADDRESS_LOOKUP;
259 }
260 err:
261 return -ERRNO_TO_PARA_ERROR(errno);
262 }
263
264 static int udp_recv_open(struct receiver_node *rn)
265 {
266 struct private_udp_recv_data *purd;
267 struct udp_recv_args_info *c = rn->conf;
268 int ret;
269
270 rn->buf = para_calloc(UDP_RECV_CHUNK_SIZE);
271 rn->private_data = para_calloc(sizeof(struct private_udp_recv_data));
272 purd = rn->private_data;
273
274 ret = makesock(AF_UNSPEC, IPPROTO_UDP, 1, c->host_arg, c->port_arg);
275 if (ret < 0)
276 goto err;
277 purd->fd = ret;
278
279 ret = mcast_receiver_setup(purd->fd);
280 if (ret < 0) {
281 close(purd->fd);
282 return ret;
283 }
284
285 ret = mark_fd_nonblocking(purd->fd);
286 if (ret < 0)
287 goto err;
288 purd->stream_type = UDP_UNKNOWN_STREAM;
289 PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
290 c->port_arg, purd->fd);
291 return purd->fd;
292 err:
293 free(rn->private_data);
294 free(rn->buf);
295 return ret;
296 }
297
298 /**
299 * The init function of the udp receiver.
300 *
301 * \param r Pointer to the receiver struct to initialize.
302 *
303 * Initialize all function pointers of \a r.
304 */
305 void udp_recv_init(struct receiver *r)
306 {
307 struct udp_recv_args_info dummy;
308
309 udp_recv_cmdline_parser_init(&dummy);
310 r->shutdown = udp_shutdown;
311 r->open = udp_recv_open;
312 r->close = udp_recv_close;
313 r->pre_select = udp_recv_pre_select;
314 r->post_select = udp_recv_post_select;
315 r->parse_config = udp_recv_parse_config;
316 r->help = (struct ggo_help) {
317 .short_help = udp_recv_args_info_help,
318 .detailed_help = udp_recv_args_info_detailed_help
319 };
320 }