2 * Copyright (C) 2005-2009 Andre Noll <maan@systemlinux.org>
4 * Licensed under the GPL v2. For licencing details see COPYING.
6 /** \file udp_recv.c Paraslash's udp receiver */
12 #include "portable_io.h"
13 #include "udp_header.h"
18 #include "udp_recv.cmdline.h"
24 /** The size of the receiver node buffer. */
25 #define UDP_RECV_CHUNK_SIZE (128 * 1024)
28 * Data specific to the udp receiver.
30 * \sa \ref receiver, \ref receiver_node.
32 struct private_udp_recv_data {
34 * Whether a header was received.
36 * A flag indicating whether this receiver already received a packet
37 * which contains the audio file header.
39 * This flag has no effect if the audio stream indicates that no extra
40 * headers will be sent (mp3, aac). Otherwise, all data packets are
41 * dropped until the header is received.
44 /** The socket file descriptor. */
46 /** Non-zero on short reads. */
48 /** Copied from the first audio header received. */
52 static void udp_recv_pre_select(struct sched *s, struct task *t)
54 struct receiver_node *rn = container_of(t, struct receiver_node, task);
55 struct private_udp_recv_data *purd = rn->private_data;
57 para_fd_set(purd->fd, &s->rfds, &s->max_fileno);
60 static int enough_space(size_t nbytes, size_t loaded)
62 return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
66 * Perform some sanity checks on an udp audio file header.
68 * return: negative on error, 0: discard data, 1: use data
70 static int examine_audio_header(struct private_udp_recv_data *purd,
71 struct udp_audio_header *uah, size_t packet_size)
73 /* payload_len includes header */
74 if (uah->payload_len < uah->header_len)
75 return -E_UDP_BAD_HEADER;
76 switch (uah->packet_type) {
80 purd->have_header = 1;
83 if (uah->header_len) /* header in no-header packet */
84 return -E_UDP_BAD_HEADER;
86 case UDP_HEADER_PACKET:
87 if (!uah->header_len) /** no header in header packet */
88 return -E_UDP_BAD_HEADER;
90 default: /* bad packet type */
91 return -E_UDP_BAD_HEADER;
93 /* check stream type */
94 if (uah->stream_type != UDP_PLAIN_STREAM &&
95 uah->stream_type != UDP_HEADER_STREAM)
96 return -E_UDP_BAD_STREAM_TYPE;
97 if (purd->stream_type == UDP_UNKNOWN_STREAM)
98 purd->stream_type = uah->stream_type;
99 /* stream type must not change */
100 if (uah->stream_type != purd->stream_type)
101 return -E_UDP_BAD_STREAM_TYPE;
102 if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
103 /* can't use the data, wait for header packet */
105 if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
106 /* we read only a part of the package */
107 purd->need_more = uah->payload_len
108 + UDP_AUDIO_HEADER_LEN - packet_size;
112 static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
116 if (!enough_space(len, rn->loaded))
117 return -E_UDP_OVERRUN;
118 memcpy(rn->buf + rn->loaded, buf, len);
123 static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
125 struct receiver_node *rn = container_of(t, struct receiver_node, task);
126 struct private_udp_recv_data *purd = rn->private_data;
128 char tmpbuf[UDP_RECV_CHUNK_SIZE];
132 struct udp_audio_header uah;
134 if (rn->output_error && *rn->output_error < 0) {
135 t->error = *rn->output_error;
138 if (!FD_ISSET(purd->fd, &s->rfds))
140 ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE);
142 if (is_errno(ret, EINTR) || is_errno(ret, EAGAIN))
147 t->error = -E_RECV_EOF;
154 if (!purd->need_more) {
155 ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
158 goto success; /* drop data */
160 num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
162 t->error = add_rn_output(rn, tmpbuf, num);
165 purd->need_more -= num;
166 if (packet_size <= num)
169 memmove(tmpbuf, tmpbuf + num, packet_size);
171 assert(!purd->need_more);
172 t->error = examine_audio_header(purd, &uah, packet_size);
175 data_len = uah.payload_len;
176 data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
177 if (uah.packet_type == UDP_HEADER_PACKET) {
178 if (purd->have_header) { /* skip header */
179 data_buf += uah.header_len;
180 data_len -= uah.header_len;
181 } else { /* only use the header */
182 purd->have_header = 1;
183 data_len = uah.header_len;
186 t->error = add_rn_output(rn, data_buf, data_len);
192 static void udp_shutdown(void)
197 static void udp_recv_close(struct receiver_node *rn)
199 struct private_udp_recv_data *purd = rn->private_data;
203 free(rn->private_data);
207 static void *udp_recv_parse_config(int argc, char **argv)
210 struct udp_recv_args_info *tmp =
211 para_calloc(sizeof(struct udp_recv_args_info));
213 ret = udp_recv_cmdline_parser(argc, argv, tmp)? -E_UDP_SYNTAX : 1;
221 * Perform AF-independent joining of multicast receive addresses.
223 * \param fd Bound socket descriptor.
224 * \param iface The receiving multicast interface, or NULL for the default.
226 * \return Zero if okay, negative on error.
228 static int mcast_receiver_setup(int fd, const char *iface)
230 struct sockaddr_storage ss;
231 socklen_t sslen = sizeof(ss);
232 int id = iface == NULL ? 0 : if_nametoindex(iface);
234 if (getsockname(fd, (struct sockaddr *)&ss, &sslen) < 0)
237 if (iface != NULL && id == 0)
238 PARA_WARNING_LOG("could not resolve interface %s, using default", iface);
240 switch (ss.ss_family) {
242 if (IN_MULTICAST(htonl(((struct sockaddr_in *)&ss)->sin_addr.s_addr))) {
246 m4.imr_address.s_addr = INADDR_ANY;
251 m4.imr_interface.s_addr = INADDR_ANY;
253 PARA_ERROR_LOG("Setting IPv4 receiver mcast interface not supported.");
256 m4.imr_multiaddr = ((struct sockaddr_in *)&ss)->sin_addr;
258 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &m4, sizeof(m4)) < 0)
263 if (IN6_IS_ADDR_MULTICAST(&((struct sockaddr_in6 *)&ss)->sin6_addr)) {
266 memset(&m6, 0, sizeof(m6));
267 memcpy(&m6.ipv6mr_multiaddr, &((struct sockaddr_in6 *)&ss)->sin6_addr, 16);
268 m6.ipv6mr_interface = id;
269 if (setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &m6, sizeof(m6)) < 0)
274 PARA_ERROR_LOG("address family %d not supported", ss.ss_family);
275 return -E_ADDRESS_LOOKUP;
278 return -ERRNO_TO_PARA_ERROR(errno);
281 static int udp_recv_open(struct receiver_node *rn)
283 struct private_udp_recv_data *purd;
284 struct udp_recv_args_info *c = rn->conf;
285 char *iface = c->iface_given ? c->iface_arg : NULL;
288 rn->buf = para_calloc(UDP_RECV_CHUNK_SIZE);
289 rn->private_data = para_calloc(sizeof(struct private_udp_recv_data));
290 purd = rn->private_data;
292 ret = makesock(AF_UNSPEC, IPPROTO_UDP, 1, c->host_arg, c->port_arg);
297 ret = mcast_receiver_setup(purd->fd, iface);
303 ret = mark_fd_nonblocking(purd->fd);
306 purd->stream_type = UDP_UNKNOWN_STREAM;
307 PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
308 c->port_arg, purd->fd);
311 free(rn->private_data);
317 * The init function of the udp receiver.
319 * \param r Pointer to the receiver struct to initialize.
321 * Initialize all function pointers of \a r.
323 void udp_recv_init(struct receiver *r)
325 struct udp_recv_args_info dummy;
327 udp_recv_cmdline_parser_init(&dummy);
328 r->shutdown = udp_shutdown;
329 r->open = udp_recv_open;
330 r->close = udp_recv_close;
331 r->pre_select = udp_recv_pre_select;
332 r->post_select = udp_recv_post_select;
333 r->parse_config = udp_recv_parse_config;
334 r->help = (struct ggo_help) {
335 .short_help = udp_recv_args_info_help,
336 .detailed_help = udp_recv_args_info_detailed_help