build: Convert alsa detection to new macros.
[paraslash.git] / sync_filter.c
1 /*
2 * Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6
7 /** \file sync_filter.c Playback synchronization filter. */
8
9 #include <netinet/in.h>
10 #include <sys/socket.h>
11 #include <regex.h>
12 #include <sys/types.h>
13 #include <arpa/inet.h>
14 #include <sys/un.h>
15 #include <netdb.h>
16
17 #include "para.h"
18 #include "sync_filter.cmdline.h"
19 #include "list.h"
20 #include "net.h"
21 #include "sched.h"
22 #include "ggo.h"
23 #include "buffer_tree.h"
24 #include "filter.h"
25 #include "string.h"
26 #include "fd.h"
27 #include "error.h"
28
29 struct sync_buddy_info {
30 const char *url;
31 char *host;
32 int port;
33 struct addrinfo *ai;
34 bool disabled;
35 };
36
37 /* One active buddy */
38 struct sync_buddy {
39 int fd;
40 struct sync_buddy_info *sbi;
41 bool ping_received;
42 struct list_head node;
43 };
44
45 /* Allocated in ->open() */
46 struct sync_filter_context {
47 int listen_fd;
48 struct list_head buddies;
49 struct timeval timeout;
50 bool ping_sent;
51 };
52
53 /* Allocated and freed in ->parse_config() and ->free_config(). */
54 struct sync_filter_config {
55 struct sync_filter_args_info *conf;
56 struct sync_buddy_info *buddy_info;
57 };
58
59 #define FOR_EACH_BUDDY(_buddy, _list) \
60 list_for_each_entry(_buddy, _list, node)
61 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
62 list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
63
64 static void sync_close_buddy(struct sync_buddy *buddy)
65 {
66 PARA_DEBUG_LOG("closing %s, fd %d\n", buddy->sbi->url, buddy->fd);
67 close(buddy->fd);
68 list_del(&buddy->node);
69 free(buddy);
70 }
71
72 static void sync_close_buddies(struct sync_filter_context *ctx)
73 {
74 struct sync_buddy *buddy, *tmp;
75
76 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies)
77 sync_close_buddy(buddy);
78 }
79
80 static void sync_close(struct filter_node *fn)
81 {
82 struct sync_filter_context *ctx = fn->private_data;
83
84 sync_close_buddies(ctx);
85 if (ctx->listen_fd >= 0) {
86 close(ctx->listen_fd);
87 ctx->listen_fd = -1;
88 }
89 free(ctx);
90 fn->private_data = NULL;
91 }
92
93 static void sync_free_config(void *conf)
94 {
95 struct sync_filter_config *sfc = conf;
96 int i;
97
98 for (i = 0; i < sfc->conf->buddy_given; i++) {
99 free(sfc->buddy_info[i].host);
100 freeaddrinfo(sfc->buddy_info[i].ai);
101 }
102 sync_filter_cmdline_parser_free(sfc->conf);
103 free(sfc);
104 }
105
106 static void sync_open(struct filter_node *fn)
107 {
108 int i, ret;
109 struct sync_filter_config *sfc = fn->conf;
110 struct sync_buddy *buddy;
111 struct sync_filter_context *ctx;
112
113 assert(sfc);
114
115 ctx = fn->private_data = para_calloc(sizeof(*ctx));
116 INIT_LIST_HEAD(&ctx->buddies);
117 ctx->listen_fd = -1;
118
119 /* create socket to listen for incoming packets */
120 ret = makesock(
121 IPPROTO_UDP,
122 true /* passive */,
123 NULL /* no host required */,
124 sfc->conf->port_arg,
125 NULL /* no flowopts */
126 );
127 if (ret < 0) {
128 PARA_ERROR_LOG("could not create UDP listening socket %d\n",
129 sfc->conf->port_arg);
130 return;
131 }
132 ctx->listen_fd = ret;
133 PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
134
135 for (i = 0; i < sfc->conf->buddy_given; i++) {
136 struct sync_buddy_info *sbi = sfc->buddy_info + i;
137 const char *url = sfc->conf->buddy_arg[i];
138 int fd;
139
140 /* make buddy udp socket from address info */
141 assert(sbi->ai);
142 ret = makesock_addrinfo(
143 IPPROTO_UDP,
144 false /* not passive */,
145 sbi->ai,
146 NULL /* no flowopts */
147 );
148 if (ret < 0) {
149 PARA_WARNING_LOG("could not make socket for %s\n",
150 url);
151 goto fail;
152 }
153 fd = ret;
154 ret = mark_fd_nonblocking(fd);
155 if (ret < 0) {
156 PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
157 url);
158 close(fd);
159 goto fail;
160 }
161 buddy = para_malloc(sizeof(*buddy));
162 buddy->fd = fd;
163 buddy->sbi = sbi;
164 buddy->ping_received = false;
165 para_list_add(&buddy->node, &ctx->buddies);
166
167 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
168 continue;
169 fail:
170 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
171 }
172 }
173
174 /*
175 * At parse config time, we build an array of struct sync_buddy_info with one
176 * entry for each buddy given in the arguments. This array is not affected by
177 * sync_close(), so information stored there can be used for multiple instances
178 * (para_audiod). We store the resolved url and the ->disabled bit in this
179 * array.
180 */
181 static int sync_parse_config(int argc, char **argv, void **result)
182 {
183 int i, ret, n;
184 struct sync_filter_config *sfc;
185 struct sync_filter_args_info *conf = para_malloc(sizeof(*conf));
186
187 sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */
188 sfc = para_calloc(sizeof(*sfc));
189 sfc->conf = conf;
190 n = conf->buddy_given;
191 sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info));
192 PARA_INFO_LOG("initializing buddy info array of length %d\n", n);
193 for (i = 0; i < n; i++) {
194 const char *url = conf->buddy_arg[i];
195 size_t len = strlen(url);
196 char *host = para_malloc(len + 1);
197 int port;
198 struct addrinfo *ai;
199 struct sync_buddy_info *sbi = sfc->buddy_info + i;
200
201 if (!parse_url(url, host, len, &port)) {
202 free(host);
203 PARA_ERROR_LOG("could not parse url %s\n", url);
204 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
205 goto fail;
206 }
207 if (port < 0)
208 port = conf->port_arg;
209 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
210 host, port, &ai);
211 if (ret < 0) {
212 PARA_ERROR_LOG("host lookup failure for %s\n", url);
213 free(host);
214 goto fail;
215 }
216 sbi->url = url;
217 sbi->host = host;
218 sbi->port = port;
219 sbi->ai = ai;
220 sbi->disabled = false;
221 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
222 }
223 *result = sfc;
224 return 1;
225 fail:
226 assert(ret < 0);
227 PARA_ERROR_LOG("%s\n", para_strerror(-ret));
228 sync_free_config(sfc);
229 return ret;
230 }
231
232 /*
233 * True if we sent a packet to all buddies and received a packet from each
234 * enabled buddy.
235 */
236 static bool sync_complete(struct sync_filter_context *ctx)
237 {
238 struct sync_buddy *buddy;
239
240 if (!ctx->ping_sent)
241 return false;
242 FOR_EACH_BUDDY(buddy, &ctx->buddies)
243 if (!buddy->sbi->disabled && !buddy->ping_received)
244 return false;
245 return true;
246 }
247
248 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
249 {
250 struct sync_buddy *buddy;
251
252 FOR_EACH_BUDDY(buddy, &ctx->buddies) {
253 if (buddy->sbi->disabled)
254 continue;
255 if (buddy->ping_received == true)
256 continue;
257 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
258 buddy->sbi->disabled = true;
259 }
260 }
261
262 static void sync_set_timeout(struct sync_filter_context *ctx,
263 struct sync_filter_config *sfc)
264 {
265 struct timeval to;
266
267 ms2tv(sfc->conf->timeout_arg, &to);
268 tv_add(now, &to, &ctx->timeout);
269 }
270
271 static void sync_pre_select(struct sched *s, void *context)
272 {
273 int ret;
274 struct filter_node *fn = context;
275 struct sync_filter_context *ctx = fn->private_data;
276 struct sync_filter_config *sfc = fn->conf;
277
278 if (list_empty(&ctx->buddies))
279 return sched_min_delay(s);
280 if (ctx->listen_fd < 0)
281 return sched_min_delay(s);
282 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
283 if (ret < 0)
284 return sched_min_delay(s);
285 para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
286 if (ret == 0)
287 return;
288 if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
289 sync_set_timeout(ctx, sfc);
290 return sched_min_delay(s);
291 }
292 if (sync_complete(ctx)) /* push down what we have */
293 return sched_min_delay(s);
294 sched_request_barrier_or_min_delay(&ctx->timeout, s);
295 }
296
297 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
298 struct list_head *list)
299 {
300 struct sync_buddy *buddy;
301
302 FOR_EACH_BUDDY(buddy, list)
303 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
304 return buddy;
305 return NULL;
306 }
307
308 static int sync_post_select(__a_unused struct sched *s, void *context)
309 {
310 int ret;
311 struct filter_node *fn = context;
312 struct sync_filter_context *ctx = fn->private_data;
313 struct sync_filter_config *sfc = fn->conf;
314 struct sync_buddy *buddy, *tmp;
315
316 if (list_empty(&ctx->buddies))
317 goto success;
318 ret = -E_SYNC_LISTEN_FD;
319 if (ctx->listen_fd < 0)
320 goto fail;
321 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
322 if (ret < 0)
323 goto fail;
324 if (ret == 0)
325 return 0;
326 if (ctx->timeout.tv_sec == 0)
327 sync_set_timeout(ctx, sfc);
328 else {
329 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
330 sync_disable_active_buddies(ctx);
331 goto success;
332 }
333 }
334 if (!ctx->ping_sent) {
335 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
336 char c = '\0';
337 PARA_INFO_LOG("pinging %s (%s)\n",
338 buddy->sbi->url, buddy->sbi->disabled?
339 "disabled" : "enabled");
340 ret = xwrite(buddy->fd, &c, 1);
341 if (ret < 0) {
342 PARA_WARNING_LOG("failed to write to %s: %s\n",
343 buddy->sbi->url, para_strerror(-ret));
344 sync_close_buddy(buddy);
345 }
346 }
347 ctx->ping_sent = true;
348 }
349 if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
350 char c;
351 for (;;) {
352 struct sockaddr src_addr;
353 socklen_t len = sizeof(src_addr);
354 ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
355 &src_addr, &len);
356 if (ret < 0) {
357 if (errno == EAGAIN || errno == EWOULDBLOCK)
358 break;
359 ret = -ERRNO_TO_PARA_ERROR(errno);
360 goto fail;
361 }
362 buddy = sync_find_buddy(&src_addr, &ctx->buddies);
363 if (!buddy) {
364 PARA_NOTICE_LOG("pinged by unknown\n");
365 continue;
366 }
367 PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
368 if (buddy->sbi->disabled) {
369 PARA_NOTICE_LOG("enabling %s\n",
370 buddy->sbi->url);
371 buddy->sbi->disabled = false;
372 }
373 buddy->ping_received = true;
374 }
375 }
376 if (!sync_complete(ctx))
377 return 1;
378 /*
379 * Although all enabled buddies are in sync we do not splice out
380 * ourselves immediately. We rather wait until the timeout expires,
381 * or the buddy list has become empty. This opens a time window
382 * for disabled buddies to become enabled by sending us a packet.
383 */
384 btr_pushdown(fn->btrn);
385 return 1;
386 success:
387 ret = -E_SYNC_COMPLETE; /* success */
388 goto out;
389 fail:
390 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
391 out:
392 sync_close_buddies(ctx);
393 btr_splice_out_node(&fn->btrn);
394 assert(ret < 0);
395 return ret;
396 }
397
398 /**
399 * The synchronization filter.
400 *
401 * \param f Pointer to the struct to initialize.
402 */
403 void sync_filter_init(struct filter *f)
404 {
405 struct sync_filter_args_info dummy;
406
407 sync_filter_cmdline_parser_init(&dummy);
408 f->open = sync_open;
409 f->close = sync_close;
410 f->pre_select = sync_pre_select;
411 f->post_select = sync_post_select;
412 f->parse_config = sync_parse_config;
413 f->free_config = sync_free_config;
414 f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter);
415 }