Move ggo files to a separate directory.
[paraslash.git] / udp_recv.c
1 /*
2 * Copyright (C) 2005-2009 Andre Noll <maan@systemlinux.org>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6 /** \file udp_recv.c Paraslash's udp receiver */
7
8 #include <dirent.h>
9
10 #include "para.h"
11 #include "error.h"
12 #include "portable_io.h"
13 #include "udp_header.h"
14 #include "list.h"
15 #include "sched.h"
16 #include "ggo.h"
17 #include "recv.h"
18 #include "udp_recv.cmdline.h"
19 #include "audiod.h"
20 #include "string.h"
21 #include "net.h"
22 #include "fd.h"
23
24 /** The size of the receiver node buffer. */
25 #define UDP_RECV_CHUNK_SIZE (128 * 1024)
26
27 /**
28 * Data specific to the udp receiver.
29 *
30 * \sa \ref receiver, \ref receiver_node.
31 */
32 struct private_udp_recv_data {
33 /**
34 * Whether a header was received.
35 *
36 * A flag indicating whether this receiver already received a packet
37 * which contains the audio file header.
38 *
39 * This flag has no effect if the audio stream indicates that no extra
40 * headers will be sent (mp3, aac). Otherwise, all data packets are
41 * dropped until the header is received.
42 */
43 int have_header;
44 /** The socket file descriptor. */
45 int fd;
46 /** Non-zero on short reads. */
47 uint16_t need_more;
48 /** Copied from the first audio header received. */
49 uint16_t stream_type;
50 };
51
52 static void udp_recv_pre_select(struct sched *s, struct task *t)
53 {
54 struct receiver_node *rn = container_of(t, struct receiver_node, task);
55 struct private_udp_recv_data *purd = rn->private_data;
56
57 para_fd_set(purd->fd, &s->rfds, &s->max_fileno);
58 }
59
60 static int enough_space(size_t nbytes, size_t loaded)
61 {
62 return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
63 }
64
65 /*
66 * Perform some sanity checks on an udp audio file header.
67 *
68 * return: negative on error, 0: discard data, 1: use data
69 */
70 static int examine_audio_header(struct private_udp_recv_data *purd,
71 struct udp_audio_header *uah, size_t packet_size)
72 {
73 /* payload_len includes header */
74 if (uah->payload_len < uah->header_len)
75 return -E_UDP_BAD_HEADER;
76 switch (uah->packet_type) {
77 case UDP_EOF_PACKET:
78 return -E_RECV_EOF;
79 case UDP_BOF_PACKET:
80 purd->have_header = 1;
81 /* fall through */
82 case UDP_DATA_PACKET:
83 if (uah->header_len) /* header in no-header packet */
84 return -E_UDP_BAD_HEADER;
85 break;
86 case UDP_HEADER_PACKET:
87 if (!uah->header_len) /** no header in header packet */
88 return -E_UDP_BAD_HEADER;
89 break;
90 default: /* bad packet type */
91 return -E_UDP_BAD_HEADER;
92 }
93 /* check stream type */
94 if (uah->stream_type != UDP_PLAIN_STREAM &&
95 uah->stream_type != UDP_HEADER_STREAM)
96 return -E_UDP_BAD_STREAM_TYPE;
97 if (purd->stream_type == UDP_UNKNOWN_STREAM)
98 purd->stream_type = uah->stream_type;
99 /* stream type must not change */
100 if (uah->stream_type != purd->stream_type)
101 return -E_UDP_BAD_STREAM_TYPE;
102 if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
103 /* can't use the data, wait for header packet */
104 return 0;
105 if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
106 /* we read only a part of the package */
107 purd->need_more = uah->payload_len
108 + UDP_AUDIO_HEADER_LEN - packet_size;
109 return 1;
110 }
111
112 static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
113 {
114 if (!len)
115 return 1;
116 if (!enough_space(len, rn->loaded))
117 return -E_UDP_OVERRUN;
118 memcpy(rn->buf + rn->loaded, buf, len);
119 rn->loaded += len;
120 return 1;
121 }
122
123 static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
124 {
125 struct receiver_node *rn = container_of(t, struct receiver_node, task);
126 struct private_udp_recv_data *purd = rn->private_data;
127 int ret;
128 char tmpbuf[UDP_RECV_CHUNK_SIZE];
129 uint16_t data_len;
130 char *data_buf;
131 size_t packet_size;
132 struct udp_audio_header uah;
133
134 if (rn->output_error && *rn->output_error < 0) {
135 t->error = *rn->output_error;
136 return;
137 }
138 if (!FD_ISSET(purd->fd, &s->rfds))
139 return;
140 ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE);
141 if (ret < 0) {
142 if (is_errno(ret, EINTR) || is_errno(ret, EAGAIN))
143 goto success;
144 t->error = ret;
145 return;
146 }
147 t->error = -E_RECV_EOF;
148 if (!ret)
149 return;
150 packet_size = ret;
151 for (;;) {
152 uint16_t num;
153
154 if (!purd->need_more) {
155 ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
156 if (ret >= 0)
157 break;
158 goto success; /* drop data */
159 }
160 num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
161 assert(num > 0);
162 t->error = add_rn_output(rn, tmpbuf, num);
163 if (t->error < 0)
164 return;
165 purd->need_more -= num;
166 if (packet_size <= num)
167 goto success;
168 packet_size -= num;
169 memmove(tmpbuf, tmpbuf + num, packet_size);
170 }
171 assert(!purd->need_more);
172 t->error = examine_audio_header(purd, &uah, packet_size);
173 if (t->error <= 0)
174 return;
175 data_len = uah.payload_len;
176 data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
177 if (uah.packet_type == UDP_HEADER_PACKET) {
178 if (purd->have_header) { /* skip header */
179 data_buf += uah.header_len;
180 data_len -= uah.header_len;
181 } else { /* only use the header */
182 purd->have_header = 1;
183 data_len = uah.header_len;
184 }
185 }
186 t->error = add_rn_output(rn, data_buf, data_len);
187 return;
188 success:
189 t->error = 1;
190 }
191
192 static void udp_shutdown(void)
193 {
194 return;
195 }
196
197 static void udp_recv_close(struct receiver_node *rn)
198 {
199 struct private_udp_recv_data *purd = rn->private_data;
200
201 if (purd->fd >= 0)
202 close(purd->fd);
203 free(rn->private_data);
204 free(rn->buf);
205 }
206
207 static void *udp_recv_parse_config(int argc, char **argv)
208 {
209 int ret;
210 struct udp_recv_args_info *tmp =
211 para_calloc(sizeof(struct udp_recv_args_info));
212
213 ret = udp_recv_cmdline_parser(argc, argv, tmp)? -E_UDP_SYNTAX : 1;
214 if (ret >= 0)
215 return tmp;
216 free(tmp);
217 return NULL;
218 }
219
220 static int udp_recv_open(struct receiver_node *rn)
221 {
222 struct private_udp_recv_data *purd;
223 struct udp_recv_args_info *c = rn->conf;
224 int ret;
225
226 rn->buf = para_calloc(UDP_RECV_CHUNK_SIZE);
227 rn->private_data = para_calloc(sizeof(struct private_udp_recv_data));
228 purd = rn->private_data;
229 ret = create_udp_recv_socket(c->host_arg, c->port_arg);
230 if (ret < 0)
231 goto err;
232 purd->fd = ret;
233 ret = mark_fd_nonblocking(purd->fd);
234 if (ret < 0)
235 goto err;
236 purd->stream_type = UDP_UNKNOWN_STREAM;
237 PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
238 c->port_arg, purd->fd);
239 return purd->fd;
240 err:
241 free(rn->private_data);
242 free(rn->buf);
243 return ret;
244 }
245
246 /**
247 * The init function of the udp receiver.
248 *
249 * \param r Pointer to the receiver struct to initialize.
250 *
251 * Initialize all function pointers of \a r.
252 */
253 void udp_recv_init(struct receiver *r)
254 {
255 struct udp_recv_args_info dummy;
256
257 udp_recv_cmdline_parser_init(&dummy);
258 r->shutdown = udp_shutdown;
259 r->open = udp_recv_open;
260 r->close = udp_recv_close;
261 r->pre_select = udp_recv_pre_select;
262 r->post_select = udp_recv_post_select;
263 r->parse_config = udp_recv_parse_config;
264 r->help = (struct ggo_help) {
265 .short_help = udp_recv_args_info_help,
266 .detailed_help = udp_recv_args_info_detailed_help
267 };
268 }