build: Convert speex detection to new macros.
[paraslash.git] / sync_filter.c
1 /*
2  * Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file sync_filter.c Playback synchronization filter. */
8
9 #include <netinet/in.h>
10 #include <sys/socket.h>
11 #include <regex.h>
12 #include <sys/types.h>
13 #include <arpa/inet.h>
14 #include <sys/un.h>
15 #include <netdb.h>
16
17 #include "para.h"
18 #include "sync_filter.cmdline.h"
19 #include "list.h"
20 #include "net.h"
21 #include "sched.h"
22 #include "ggo.h"
23 #include "buffer_tree.h"
24 #include "filter.h"
25 #include "string.h"
26 #include "fd.h"
27 #include "error.h"
28
29 struct sync_buddy_info {
30         const char *url;
31         char *host;
32         int port;
33         struct addrinfo *ai;
34         bool disabled;
35 };
36
37 /* One active buddy */
38 struct sync_buddy {
39         int fd;
40         struct sync_buddy_info *sbi;
41         bool ping_received;
42         struct list_head node;
43 };
44
45 /* Allocated in ->open() */
46 struct sync_filter_context {
47         int listen_fd;
48         struct list_head buddies;
49         struct timeval timeout;
50         bool ping_sent;
51 };
52
53 /* Allocated and freed in ->parse_config() and ->free_config(). */
54 struct sync_filter_config {
55         struct sync_filter_args_info *conf;
56         struct sync_buddy_info *buddy_info;
57 };
58
59 #define FOR_EACH_BUDDY(_buddy, _list) \
60         list_for_each_entry(_buddy, _list, node)
61 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
62         list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
63
64 static void sync_close_buddy(struct sync_buddy *buddy)
65 {
66         PARA_DEBUG_LOG("closing %s, fd %d\n", buddy->sbi->url, buddy->fd);
67         close(buddy->fd);
68         list_del(&buddy->node);
69         free(buddy);
70 }
71
72 static void sync_close_buddies(struct sync_filter_context *ctx)
73 {
74         struct sync_buddy *buddy, *tmp;
75
76         FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies)
77                 sync_close_buddy(buddy);
78 }
79
80 static void sync_close(struct filter_node *fn)
81 {
82         struct sync_filter_context *ctx = fn->private_data;
83
84         sync_close_buddies(ctx);
85         if (ctx->listen_fd >= 0) {
86                 close(ctx->listen_fd);
87                 ctx->listen_fd = -1;
88         }
89         free(ctx);
90         fn->private_data = NULL;
91 }
92
93 static void sync_free_config(void *conf)
94 {
95         struct sync_filter_config *sfc = conf;
96         int i;
97
98         for (i = 0; i < sfc->conf->buddy_given; i++) {
99                 free(sfc->buddy_info[i].host);
100                 freeaddrinfo(sfc->buddy_info[i].ai);
101         }
102         sync_filter_cmdline_parser_free(sfc->conf);
103         free(sfc);
104 }
105
106 static void sync_open(struct filter_node *fn)
107 {
108         int i, ret;
109         struct sync_filter_config *sfc = fn->conf;
110         struct sync_buddy *buddy;
111         struct sync_filter_context *ctx;
112
113         assert(sfc);
114
115         ctx = fn->private_data = para_calloc(sizeof(*ctx));
116         INIT_LIST_HEAD(&ctx->buddies);
117         ctx->listen_fd = -1;
118
119         /* create socket to listen for incoming packets */
120         ret = makesock(
121                 IPPROTO_UDP,
122                 true /* passive */,
123                 NULL /* no host required */,
124                 sfc->conf->port_arg,
125                 NULL /* no flowopts */
126         );
127         if (ret < 0) {
128                 PARA_ERROR_LOG("could not create UDP listening socket %d\n",
129                         sfc->conf->port_arg);
130                 return;
131         }
132         ctx->listen_fd = ret;
133         PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
134
135         for (i = 0; i < sfc->conf->buddy_given; i++) {
136                 struct sync_buddy_info *sbi = sfc->buddy_info + i;
137                 const char *url = sfc->conf->buddy_arg[i];
138                 int fd;
139
140                 /* make buddy udp socket from address info */
141                 assert(sbi->ai);
142                 ret = makesock_addrinfo(
143                         IPPROTO_UDP,
144                         false /* not passive */,
145                         sbi->ai,
146                         NULL /* no flowopts */
147                 );
148                 if (ret < 0) {
149                         PARA_WARNING_LOG("could not make socket for %s\n",
150                                 url);
151                         goto fail;
152                 }
153                 fd = ret;
154                 ret = mark_fd_nonblocking(fd);
155                 if (ret < 0) {
156                         PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
157                                 url);
158                         close(fd);
159                         goto fail;
160                 }
161                 buddy = para_malloc(sizeof(*buddy));
162                 buddy->fd = fd;
163                 buddy->sbi = sbi;
164                 buddy->ping_received = false;
165                 para_list_add(&buddy->node, &ctx->buddies);
166
167                 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
168                 continue;
169 fail:
170                 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
171         }
172 }
173
174 /*
175  * At parse config time, we build an array of struct sync_buddy_info with one
176  * entry for each buddy given in the arguments. This array is not affected by
177  * sync_close(), so information stored there can be used for multiple instances
178  * (para_audiod). We store the resolved url and the ->disabled bit in this
179  * array.
180  */
181 static int sync_parse_config(int argc, char **argv, void **result)
182 {
183         int i, ret, n;
184         struct sync_filter_config *sfc;
185         struct sync_filter_args_info *conf = para_malloc(sizeof(*conf));
186
187         sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */
188         sfc = para_calloc(sizeof(*sfc));
189         sfc->conf = conf;
190         n = conf->buddy_given;
191         sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info));
192         PARA_INFO_LOG("initializing buddy info array of length %d\n", n);
193         for (i = 0; i < n; i++) {
194                 const char *url = conf->buddy_arg[i];
195                 size_t len = strlen(url);
196                 char *host = para_malloc(len + 1);
197                 int port;
198                 struct addrinfo *ai;
199                 struct sync_buddy_info *sbi = sfc->buddy_info + i;
200
201                 if (!parse_url(url, host, len, &port)) {
202                         free(host);
203                         PARA_ERROR_LOG("could not parse url %s\n", url);
204                         ret = -ERRNO_TO_PARA_ERROR(EINVAL);
205                         goto fail;
206                 }
207                 if (port < 0)
208                         port = conf->port_arg;
209                 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
210                         host, port, &ai);
211                 if (ret < 0) {
212                         PARA_ERROR_LOG("host lookup failure for %s\n", url);
213                         free(host);
214                         goto fail;
215                 }
216                 sbi->url = url;
217                 sbi->host = host;
218                 sbi->port = port;
219                 sbi->ai = ai;
220                 sbi->disabled = false;
221                 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
222         }
223         *result = sfc;
224         return 1;
225 fail:
226         assert(ret < 0);
227         PARA_ERROR_LOG("%s\n", para_strerror(-ret));
228         sync_free_config(sfc);
229         return ret;
230 }
231
232 /*
233  * True if we sent a packet to all buddies and received a packet from each
234  * enabled buddy.
235  */
236 static bool sync_complete(struct sync_filter_context *ctx)
237 {
238         struct sync_buddy *buddy;
239
240         if (!ctx->ping_sent)
241                 return false;
242         FOR_EACH_BUDDY(buddy, &ctx->buddies)
243                 if (!buddy->sbi->disabled && !buddy->ping_received)
244                         return false;
245         return true;
246 }
247
248 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
249 {
250         struct sync_buddy *buddy;
251
252         FOR_EACH_BUDDY(buddy, &ctx->buddies) {
253                 if (buddy->sbi->disabled)
254                         continue;
255                 if (buddy->ping_received == true)
256                         continue;
257                 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
258                 buddy->sbi->disabled = true;
259         }
260 }
261
262 static void sync_set_timeout(struct sync_filter_context *ctx,
263                 struct sync_filter_config *sfc)
264 {
265         struct timeval to;
266
267         ms2tv(sfc->conf->timeout_arg, &to);
268         tv_add(now, &to, &ctx->timeout);
269 }
270
271 static void sync_pre_select(struct sched *s, void *context)
272 {
273         int ret;
274         struct filter_node *fn = context;
275         struct sync_filter_context *ctx = fn->private_data;
276         struct sync_filter_config *sfc = fn->conf;
277
278         if (list_empty(&ctx->buddies))
279                 return sched_min_delay(s);
280         if (ctx->listen_fd < 0)
281                 return sched_min_delay(s);
282         ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
283         if (ret < 0)
284                 return sched_min_delay(s);
285         para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
286         if (ret == 0)
287                 return;
288         if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
289                 sync_set_timeout(ctx, sfc);
290                 return sched_min_delay(s);
291         }
292         if (sync_complete(ctx)) /* push down what we have */
293                 return sched_min_delay(s);
294         sched_request_barrier_or_min_delay(&ctx->timeout, s);
295 }
296
297 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
298                 struct list_head *list)
299 {
300         struct sync_buddy *buddy;
301
302         FOR_EACH_BUDDY(buddy, list)
303                 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
304                         return buddy;
305         return NULL;
306 }
307
308 static int sync_post_select(__a_unused struct sched *s, void *context)
309 {
310         int ret;
311         struct filter_node *fn = context;
312         struct sync_filter_context *ctx = fn->private_data;
313         struct sync_filter_config *sfc = fn->conf;
314         struct sync_buddy *buddy, *tmp;
315
316         if (list_empty(&ctx->buddies))
317                 goto success;
318         ret = -E_SYNC_LISTEN_FD;
319         if (ctx->listen_fd < 0)
320                 goto fail;
321         ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
322         if (ret < 0)
323                 goto fail;
324         if (ret == 0)
325                 return 0;
326         if (ctx->timeout.tv_sec == 0)
327                 sync_set_timeout(ctx, sfc);
328         else {
329                 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
330                         sync_disable_active_buddies(ctx);
331                         goto success;
332                 }
333         }
334         if (!ctx->ping_sent) {
335                 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
336                         char c = '\0';
337                         PARA_INFO_LOG("pinging %s (%s)\n",
338                                 buddy->sbi->url, buddy->sbi->disabled?
339                                 "disabled" : "enabled");
340                         ret = xwrite(buddy->fd, &c, 1);
341                         if (ret < 0) {
342                                 PARA_WARNING_LOG("failed to write to %s: %s\n",
343                                         buddy->sbi->url, para_strerror(-ret));
344                                 sync_close_buddy(buddy);
345                         }
346                 }
347                 ctx->ping_sent = true;
348         }
349         if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
350                 char c;
351                 for (;;) {
352                         struct sockaddr src_addr;
353                         socklen_t len = sizeof(src_addr);
354                         ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
355                                 &src_addr, &len);
356                         if (ret < 0) {
357                                 if (errno == EAGAIN || errno == EWOULDBLOCK)
358                                         break;
359                                 ret = -ERRNO_TO_PARA_ERROR(errno);
360                                 goto fail;
361                         }
362                         buddy = sync_find_buddy(&src_addr, &ctx->buddies);
363                         if (!buddy) {
364                                 PARA_NOTICE_LOG("pinged by unknown\n");
365                                 continue;
366                         }
367                         PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
368                         if (buddy->sbi->disabled) {
369                                 PARA_NOTICE_LOG("enabling %s\n",
370                                         buddy->sbi->url);
371                                 buddy->sbi->disabled = false;
372                         }
373                         buddy->ping_received = true;
374                 }
375         }
376         if (!sync_complete(ctx))
377                 return 1;
378         /*
379          * Although all enabled buddies are in sync we do not splice out
380          * ourselves immediately. We rather wait until the timeout expires,
381          * or the buddy list has become empty. This opens a time window
382          * for disabled buddies to become enabled by sending us a packet.
383          */
384         btr_pushdown(fn->btrn);
385         return 1;
386 success:
387         ret = -E_SYNC_COMPLETE; /* success */
388         goto out;
389 fail:
390         PARA_WARNING_LOG("%s\n", para_strerror(-ret));
391 out:
392         sync_close_buddies(ctx);
393         btr_splice_out_node(&fn->btrn);
394         assert(ret < 0);
395         return ret;
396 }
397
398 /**
399  * The synchronization filter.
400  *
401  * \param f Pointer to the struct to initialize.
402  */
403 void sync_filter_init(struct filter *f)
404 {
405         struct sync_filter_args_info dummy;
406
407         sync_filter_cmdline_parser_init(&dummy);
408         f->open = sync_open;
409         f->close = sync_close;
410         f->pre_select = sync_pre_select;
411         f->post_select = sync_post_select;
412         f->parse_config = sync_parse_config;
413         f->free_config = sync_free_config;
414         f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter);
415 }