para_fade: Add description of the three modes of operation.
[paraslash.git] / udp_recv.c
1 /*
2  * Copyright (C) 2005-2009 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6 /** \file udp_recv.c Paraslash's udp receiver */
7
8 #include <dirent.h>
9
10 #include "para.h"
11 #include "error.h"
12 #include "portable_io.h"
13 #include "udp_header.h"
14 #include "list.h"
15 #include "sched.h"
16 #include "ggo.h"
17 #include "recv.h"
18 #include "udp_recv.cmdline.h"
19 #include "audiod.h"
20 #include "string.h"
21 #include "net.h"
22 #include "fd.h"
23
24 /** The size of the receiver node buffer. */
25 #define UDP_RECV_CHUNK_SIZE (128 * 1024)
26
27 /**
28  * Data specific to the udp receiver.
29  *
30  * \sa \ref receiver, \ref receiver_node.
31  */
32 struct private_udp_recv_data {
33         /**
34          * Whether a header was received.
35          *
36          * A flag indicating whether this receiver already received a packet
37          * which contains the audio file header.
38          *
39          * This flag has no effect if the audio stream indicates that no extra
40          * headers will be sent (mp3, aac).  Otherwise, all data packets are
41          * dropped until the header is received.
42          */
43         int have_header;
44         /** The socket file descriptor. */
45         int fd;
46         /** Non-zero on short reads. */
47         uint16_t need_more;
48         /** Copied from the first audio header received. */
49         uint16_t stream_type;
50 };
51
52 static void udp_recv_pre_select(struct sched *s, struct task *t)
53 {
54         struct receiver_node *rn = container_of(t, struct receiver_node, task);
55         struct private_udp_recv_data *purd = rn->private_data;
56
57         para_fd_set(purd->fd, &s->rfds, &s->max_fileno);
58 }
59
60 static int enough_space(size_t nbytes, size_t loaded)
61 {
62         return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
63 }
64
65 /*
66  * Perform some sanity checks on an udp audio file header.
67  *
68  * return: negative on error, 0: discard data, 1: use data
69  */
70 static int examine_audio_header(struct private_udp_recv_data *purd,
71                 struct udp_audio_header *uah, size_t packet_size)
72 {
73         /* payload_len includes header */
74         if (uah->payload_len < uah->header_len)
75                 return -E_UDP_BAD_HEADER;
76         switch (uah->packet_type) {
77         case UDP_EOF_PACKET:
78                 return -E_RECV_EOF;
79         case UDP_BOF_PACKET:
80                 purd->have_header = 1;
81                 /* fall through */
82         case UDP_DATA_PACKET:
83                 if (uah->header_len) /* header in no-header packet */
84                         return -E_UDP_BAD_HEADER;
85                 break;
86         case UDP_HEADER_PACKET:
87                 if (!uah->header_len) /** no header in header packet */
88                         return -E_UDP_BAD_HEADER;
89                 break;
90         default: /* bad packet type */
91                 return -E_UDP_BAD_HEADER;
92         }
93         /* check stream type */
94         if (uah->stream_type != UDP_PLAIN_STREAM &&
95                         uah->stream_type != UDP_HEADER_STREAM)
96                 return -E_UDP_BAD_STREAM_TYPE;
97         if (purd->stream_type == UDP_UNKNOWN_STREAM)
98                 purd->stream_type = uah->stream_type;
99         /* stream type must not change */
100         if (uah->stream_type != purd->stream_type)
101                 return -E_UDP_BAD_STREAM_TYPE;
102         if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
103                 /* can't use the data, wait for header packet */
104                 return 0;
105         if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
106                 /* we read only a part of the package */
107                 purd->need_more = uah->payload_len
108                         + UDP_AUDIO_HEADER_LEN - packet_size;
109         return 1;
110 }
111
112 static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
113 {
114         if (!len)
115                 return 1;
116         if (!enough_space(len, rn->loaded))
117                 return -E_UDP_OVERRUN;
118         memcpy(rn->buf + rn->loaded, buf, len);
119         rn->loaded += len;
120         return 1;
121 }
122
123 static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
124 {
125         struct receiver_node *rn = container_of(t, struct receiver_node, task);
126         struct private_udp_recv_data *purd = rn->private_data;
127         int ret;
128         char tmpbuf[UDP_RECV_CHUNK_SIZE];
129         uint16_t data_len;
130         char *data_buf;
131         size_t packet_size;
132         struct udp_audio_header uah;
133
134         if (rn->output_error && *rn->output_error < 0) {
135                 t->error = *rn->output_error;
136                 return;
137         }
138         if (!FD_ISSET(purd->fd, &s->rfds))
139                 return;
140         ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE);
141         if (ret < 0) {
142                 if (is_errno(ret, EINTR) || is_errno(ret, EAGAIN))
143                         goto success;
144                 t->error = ret;
145                 return;
146         }
147         t->error = -E_RECV_EOF;
148         if (!ret)
149                 return;
150         packet_size = ret;
151         for (;;) {
152                 uint16_t num;
153
154                 if (!purd->need_more) {
155                         ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
156                         if (ret >= 0)
157                                 break;
158                         goto success; /* drop data */
159                 }
160                 num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
161                 assert(num > 0);
162                 t->error = add_rn_output(rn, tmpbuf, num);
163                 if (t->error < 0)
164                         return;
165                 purd->need_more -= num;
166                 if (packet_size <= num)
167                         goto success;
168                 packet_size -= num;
169                 memmove(tmpbuf, tmpbuf + num, packet_size);
170         }
171         assert(!purd->need_more);
172         t->error = examine_audio_header(purd, &uah, packet_size);
173         if (t->error <= 0)
174                 return;
175         data_len = uah.payload_len;
176         data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
177         if (uah.packet_type == UDP_HEADER_PACKET) {
178                 if (purd->have_header) { /* skip header */
179                         data_buf += uah.header_len;
180                         data_len -= uah.header_len;
181                 } else { /* only use the header */
182                         purd->have_header = 1;
183                         data_len = uah.header_len;
184                 }
185         }
186         t->error = add_rn_output(rn, data_buf, data_len);
187         return;
188 success:
189         t->error = 1;
190 }
191
192 static void udp_shutdown(void)
193 {
194         return;
195 }
196
197 static void udp_recv_close(struct receiver_node *rn)
198 {
199         struct private_udp_recv_data *purd = rn->private_data;
200
201         if (purd->fd >= 0)
202                 close(purd->fd);
203         free(rn->private_data);
204         free(rn->buf);
205 }
206
207 static void *udp_recv_parse_config(int argc, char **argv)
208 {
209         int ret;
210         struct udp_recv_args_info *tmp =
211                 para_calloc(sizeof(struct udp_recv_args_info));
212
213         ret = udp_recv_cmdline_parser(argc, argv, tmp)? -E_UDP_SYNTAX : 1;
214         if (ret >= 0)
215                 return tmp;
216         free(tmp);
217         return NULL;
218 }
219
220 static int udp_recv_open(struct receiver_node *rn)
221 {
222         struct private_udp_recv_data *purd;
223         struct udp_recv_args_info *c = rn->conf;
224         int ret;
225
226         rn->buf = para_calloc(UDP_RECV_CHUNK_SIZE);
227         rn->private_data = para_calloc(sizeof(struct private_udp_recv_data));
228         purd = rn->private_data;
229         ret = create_udp_recv_socket(c->host_arg, c->port_arg);
230         if (ret < 0)
231                 goto err;
232         purd->fd = ret;
233         ret = mark_fd_nonblocking(purd->fd);
234         if (ret < 0)
235                 goto err;
236         purd->stream_type = UDP_UNKNOWN_STREAM;
237         PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
238                 c->port_arg, purd->fd);
239         return purd->fd;
240 err:
241         free(rn->private_data);
242         free(rn->buf);
243         return ret;
244 }
245
246 /**
247  * The init function of the udp receiver.
248  *
249  * \param r Pointer to the receiver struct to initialize.
250  *
251  * Initialize all function pointers of \a r.
252  */
253 void udp_recv_init(struct receiver *r)
254 {
255         struct udp_recv_args_info dummy;
256
257         udp_recv_cmdline_parser_init(&dummy);
258         r->shutdown = udp_shutdown;
259         r->open = udp_recv_open;
260         r->close = udp_recv_close;
261         r->pre_select = udp_recv_pre_select;
262         r->post_select = udp_recv_post_select;
263         r->parse_config = udp_recv_parse_config;
264         r->help = (struct ggo_help) {
265                 .short_help = udp_recv_args_info_help,
266                 .detailed_help = udp_recv_args_info_detailed_help
267         };
268 }