]> git.tuebingen.mpg.de Git - paraslash.git/blob - udp_recv.c
Merge commit 'fml/master'
[paraslash.git] / udp_recv.c
1 /*
2  * Copyright (C) 2005-2009 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6 /** \file udp_recv.c Paraslash's udp receiver */
7
8 #include <dirent.h>
9
10 #include "para.h"
11 #include "error.h"
12 #include "portable_io.h"
13 #include "udp_header.h"
14 #include "list.h"
15 #include "sched.h"
16 #include "ggo.h"
17 #include "recv.h"
18 #include "udp_recv.cmdline.h"
19 #include "audiod.h"
20 #include "string.h"
21 #include "net.h"
22 #include "fd.h"
23
24 /** The size of the receiver node buffer. */
25 #define UDP_RECV_CHUNK_SIZE (128 * 1024)
26
27 /**
28  * Data specific to the udp receiver.
29  *
30  * \sa \ref receiver, \ref receiver_node.
31  */
32 struct private_udp_recv_data {
33         /**
34          * Whether a header was received.
35          *
36          * A flag indicating whether this receiver already received a packet
37          * which contains the audio file header.
38          *
39          * This flag has no effect if the audio stream indicates that no extra
40          * headers will be sent (mp3, aac).  Otherwise, all data packets are
41          * dropped until the header is received.
42          */
43         int have_header;
44         /** The socket file descriptor. */
45         int fd;
46         /** Non-zero on short reads. */
47         uint16_t need_more;
48         /** Copied from the first audio header received. */
49         uint16_t stream_type;
50 };
51
52 static void udp_recv_pre_select(struct sched *s, struct task *t)
53 {
54         struct receiver_node *rn = container_of(t, struct receiver_node, task);
55         struct private_udp_recv_data *purd = rn->private_data;
56
57         para_fd_set(purd->fd, &s->rfds, &s->max_fileno);
58 }
59
60 static int enough_space(size_t nbytes, size_t loaded)
61 {
62         return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
63 }
64
65 /*
66  * Perform some sanity checks on an udp audio file header.
67  *
68  * return: negative on error, 0: discard data, 1: use data
69  */
70 static int examine_audio_header(struct private_udp_recv_data *purd,
71                 struct udp_audio_header *uah, size_t packet_size)
72 {
73         /* payload_len includes header */
74         if (uah->payload_len < uah->header_len)
75                 return -E_UDP_BAD_HEADER;
76         switch (uah->packet_type) {
77         case UDP_EOF_PACKET:
78                 return -E_RECV_EOF;
79         case UDP_BOF_PACKET:
80                 purd->have_header = 1;
81                 /* fall through */
82         case UDP_DATA_PACKET:
83                 if (uah->header_len) /* header in no-header packet */
84                         return -E_UDP_BAD_HEADER;
85                 break;
86         case UDP_HEADER_PACKET:
87                 if (!uah->header_len) /** no header in header packet */
88                         return -E_UDP_BAD_HEADER;
89                 break;
90         default: /* bad packet type */
91                 return -E_UDP_BAD_HEADER;
92         }
93         /* check stream type */
94         if (uah->stream_type != UDP_PLAIN_STREAM &&
95                         uah->stream_type != UDP_HEADER_STREAM)
96                 return -E_UDP_BAD_STREAM_TYPE;
97         if (purd->stream_type == UDP_UNKNOWN_STREAM)
98                 purd->stream_type = uah->stream_type;
99         /* stream type must not change */
100         if (uah->stream_type != purd->stream_type)
101                 return -E_UDP_BAD_STREAM_TYPE;
102         if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
103                 /* can't use the data, wait for header packet */
104                 return 0;
105         if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
106                 /* we read only a part of the package */
107                 purd->need_more = uah->payload_len
108                         + UDP_AUDIO_HEADER_LEN - packet_size;
109         return 1;
110 }
111
112 static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
113 {
114         if (!len)
115                 return 1;
116         if (!enough_space(len, rn->loaded))
117                 return -E_UDP_OVERRUN;
118         memcpy(rn->buf + rn->loaded, buf, len);
119         rn->loaded += len;
120         return 1;
121 }
122
123 static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
124 {
125         struct receiver_node *rn = container_of(t, struct receiver_node, task);
126         struct private_udp_recv_data *purd = rn->private_data;
127         int ret;
128         char tmpbuf[UDP_RECV_CHUNK_SIZE];
129         uint16_t data_len;
130         char *data_buf;
131         size_t packet_size;
132         struct udp_audio_header uah;
133
134         if (rn->output_error && *rn->output_error < 0) {
135                 t->error = *rn->output_error;
136                 return;
137         }
138         if (!FD_ISSET(purd->fd, &s->rfds))
139                 return;
140         ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE);
141         if (ret < 0) {
142                 if (is_errno(ret, EINTR) || is_errno(ret, EAGAIN))
143                         goto success;
144                 t->error = ret;
145                 return;
146         }
147         t->error = -E_RECV_EOF;
148         if (!ret)
149                 return;
150         packet_size = ret;
151         for (;;) {
152                 uint16_t num;
153
154                 if (!purd->need_more) {
155                         ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
156                         if (ret >= 0)
157                                 break;
158                         goto success; /* drop data */
159                 }
160                 num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
161                 assert(num > 0);
162                 t->error = add_rn_output(rn, tmpbuf, num);
163                 if (t->error < 0)
164                         return;
165                 purd->need_more -= num;
166                 if (packet_size <= num)
167                         goto success;
168                 packet_size -= num;
169                 memmove(tmpbuf, tmpbuf + num, packet_size);
170         }
171         assert(!purd->need_more);
172         t->error = examine_audio_header(purd, &uah, packet_size);
173         if (t->error <= 0)
174                 return;
175         data_len = uah.payload_len;
176         data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
177         if (uah.packet_type == UDP_HEADER_PACKET) {
178                 if (purd->have_header) { /* skip header */
179                         data_buf += uah.header_len;
180                         data_len -= uah.header_len;
181                 } else { /* only use the header */
182                         purd->have_header = 1;
183                         data_len = uah.header_len;
184                 }
185         }
186         t->error = add_rn_output(rn, data_buf, data_len);
187         return;
188 success:
189         t->error = 1;
190 }
191
192 static void udp_shutdown(void)
193 {
194         return;
195 }
196
197 static void udp_recv_close(struct receiver_node *rn)
198 {
199         struct private_udp_recv_data *purd = rn->private_data;
200
201         if (purd->fd >= 0)
202                 close(purd->fd);
203         free(rn->private_data);
204         free(rn->buf);
205 }
206
207 static void *udp_recv_parse_config(int argc, char **argv)
208 {
209         int ret;
210         struct udp_recv_args_info *tmp =
211                 para_calloc(sizeof(struct udp_recv_args_info));
212
213         ret = udp_recv_cmdline_parser(argc, argv, tmp)? -E_UDP_SYNTAX : 1;
214         if (ret >= 0)
215                 return tmp;
216         free(tmp);
217         return NULL;
218 }
219
220 /*
221  * Perform AF-independent joining of multicast receive addresses.
222  *
223  * \param fd    Bound socket descriptor.
224  * \param iface The receiving multicast interface, or NULL for the default.
225  *
226  * \return Zero if okay, negative on error.
227  */
228 static int mcast_receiver_setup(int fd, const char *iface)
229 {
230         struct sockaddr_storage ss;
231         socklen_t sslen = sizeof(ss);
232         int id = iface == NULL ? 0 : if_nametoindex(iface);
233
234         if (getsockname(fd, (struct sockaddr *)&ss, &sslen) < 0)
235                 goto err;
236
237         if (iface != NULL && id == 0)
238                 PARA_WARNING_LOG("could not resolve interface %s, using default", iface);
239
240         switch (ss.ss_family) {
241         case AF_INET:
242                 if (IN_MULTICAST(htonl(((struct sockaddr_in *)&ss)->sin_addr.s_addr))) {
243 #ifdef HAVE_IP_MREQN
244                         struct ip_mreqn m4;
245
246                         m4.imr_address.s_addr   = INADDR_ANY;
247                         m4.imr_ifindex          = id;
248 #else
249                         struct ip_mreq m4;
250
251                         m4.imr_interface.s_addr = INADDR_ANY;
252                         if (id != 0)
253                                 PARA_ERROR_LOG("Setting IPv4 receiver mcast interface not supported.");
254
255 #endif
256                         m4.imr_multiaddr        = ((struct sockaddr_in *)&ss)->sin_addr;
257
258                         if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &m4, sizeof(m4)) < 0)
259                                 break;
260                 }
261                 return 0;
262         case AF_INET6:
263                 if (IN6_IS_ADDR_MULTICAST(&((struct sockaddr_in6 *)&ss)->sin6_addr)) {
264                         struct ipv6_mreq m6;
265
266                         memset(&m6, 0, sizeof(m6));
267                         memcpy(&m6.ipv6mr_multiaddr, &((struct sockaddr_in6 *)&ss)->sin6_addr, 16);
268                         m6.ipv6mr_interface = id;
269                         if (setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &m6, sizeof(m6)) < 0)
270                                 break;
271                 }
272                 return 0;
273         default:
274                 PARA_ERROR_LOG("address family %d not supported", ss.ss_family);
275                 return -E_ADDRESS_LOOKUP;
276         }
277 err:
278         return -ERRNO_TO_PARA_ERROR(errno);
279 }
280
281 static int udp_recv_open(struct receiver_node *rn)
282 {
283         struct private_udp_recv_data *purd;
284         struct udp_recv_args_info *c = rn->conf;
285         char  *iface = c->iface_given ? c->iface_arg : NULL;
286         int ret;
287
288         rn->buf = para_calloc(UDP_RECV_CHUNK_SIZE);
289         rn->private_data = para_calloc(sizeof(struct private_udp_recv_data));
290         purd = rn->private_data;
291
292         ret = makesock(AF_UNSPEC, IPPROTO_UDP, 1, c->host_arg, c->port_arg);
293         if (ret < 0)
294                 goto err;
295         purd->fd = ret;
296
297         ret = mcast_receiver_setup(purd->fd, iface);
298         if (ret < 0) {
299                 close(purd->fd);
300                 return ret;
301         }
302
303         ret = mark_fd_nonblocking(purd->fd);
304         if (ret < 0)
305                 goto err;
306         purd->stream_type = UDP_UNKNOWN_STREAM;
307         PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
308                 c->port_arg, purd->fd);
309         return purd->fd;
310 err:
311         free(rn->private_data);
312         free(rn->buf);
313         return ret;
314 }
315
316 /**
317  * The init function of the udp receiver.
318  *
319  * \param r Pointer to the receiver struct to initialize.
320  *
321  * Initialize all function pointers of \a r.
322  */
323 void udp_recv_init(struct receiver *r)
324 {
325         struct udp_recv_args_info dummy;
326
327         udp_recv_cmdline_parser_init(&dummy);
328         r->shutdown = udp_shutdown;
329         r->open = udp_recv_open;
330         r->close = udp_recv_close;
331         r->pre_select = udp_recv_pre_select;
332         r->post_select = udp_recv_post_select;
333         r->parse_config = udp_recv_parse_config;
334         r->help = (struct ggo_help) {
335                 .short_help = udp_recv_args_info_help,
336                 .detailed_help = udp_recv_args_info_detailed_help
337         };
338 }