vss: Improve comment about sending empty chunks.
[paraslash.git] / sync_filter.c
1 /*
2  * Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file sync_filter.c Playback synchronization filter. */
8
9 #include <netinet/in.h>
10 #include <sys/socket.h>
11 #include <regex.h>
12 #include <sys/types.h>
13 #include <arpa/inet.h>
14 #include <sys/un.h>
15 #include <netdb.h>
16 #include <lopsub.h>
17
18 #include "filter_cmd.lsg.h"
19 #include "para.h"
20 #include "list.h"
21 #include "net.h"
22 #include "sched.h"
23 #include "buffer_tree.h"
24 #include "filter.h"
25 #include "string.h"
26 #include "fd.h"
27 #include "error.h"
28
29 struct sync_buddy_info {
30         const char *url;
31         char *host;
32         int port;
33         struct addrinfo *ai;
34         bool disabled;
35 };
36
37 /* One active buddy */
38 struct sync_buddy {
39         int fd;
40         struct sync_buddy_info *sbi;
41         bool ping_received;
42         struct list_head node;
43 };
44
45 /* Allocated in ->open(), stored in fn->private_data */
46 struct sync_filter_context {
47         int listen_fd;
48         struct list_head buddies;
49         struct timeval timeout;
50         bool ping_sent;
51 };
52
53 #define FOR_EACH_BUDDY(_buddy, _list) \
54         list_for_each_entry(_buddy, _list, node)
55 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
56         list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
57
58 static void sync_close_buddy(struct sync_buddy *buddy)
59 {
60         PARA_DEBUG_LOG("closing %s, fd %d\n", buddy->sbi->url, buddy->fd);
61         close(buddy->fd);
62         list_del(&buddy->node);
63         free(buddy);
64 }
65
66 static void sync_close_buddies(struct sync_filter_context *ctx)
67 {
68         struct sync_buddy *buddy, *tmp;
69
70         FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies)
71                 sync_close_buddy(buddy);
72 }
73
74 static void sync_close(struct filter_node *fn)
75 {
76         struct sync_filter_context *ctx = fn->private_data;
77
78         sync_close_buddies(ctx);
79         if (ctx->listen_fd >= 0) {
80                 close(ctx->listen_fd);
81                 ctx->listen_fd = -1;
82         }
83         free(ctx);
84         fn->private_data = NULL;
85 }
86
87 static void sync_teardown(const struct lls_parse_result *lpr, void *conf)
88 {
89         struct sync_buddy_info *sbi = conf;
90         int i, num_buddies = FILTER_CMD_OPT_GIVEN(SYNC, BUDDY, lpr);
91
92         for (i = 0; i < num_buddies; i++) {
93                 free(sbi[i].host);
94                 freeaddrinfo(sbi[i].ai);
95         }
96         free(sbi);
97 }
98
99 static void sync_open(struct filter_node *fn)
100 {
101         int i, ret;
102         struct sync_buddy *buddy;
103         struct sync_filter_context *ctx;
104         struct sync_buddy_info *sbi = fn->conf;
105         uint32_t port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, fn->lpr);
106         unsigned buddy_given;
107         const struct lls_opt_result *r_b;
108
109         ctx = fn->private_data = para_calloc(sizeof(*ctx));
110         INIT_LIST_HEAD(&ctx->buddies);
111
112         /* create socket to listen for incoming packets */
113         ret = makesock(
114                 IPPROTO_UDP,
115                 true /* passive */,
116                 NULL /* no host required */,
117                 port,
118                 NULL /* no flowopts */
119         );
120         if (ret < 0) {
121                 PARA_ERROR_LOG("could not create UDP listening socket %u\n",
122                         port);
123                 return;
124         }
125         ctx->listen_fd = ret;
126         PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
127
128         r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, fn->lpr);
129         buddy_given = lls_opt_given(r_b);
130         for (i = 0; i < buddy_given; i++) {
131                 int fd;
132                 const char *url = lls_string_val(i, r_b);
133
134                 /* make buddy udp socket from address info */
135                 assert(sbi->ai);
136                 ret = makesock_addrinfo(
137                         IPPROTO_UDP,
138                         false /* not passive */,
139                         sbi[i].ai,
140                         NULL /* no flowopts */
141                 );
142                 if (ret < 0) {
143                         PARA_WARNING_LOG("could not make socket for %s\n",
144                                 url);
145                         goto fail;
146                 }
147                 fd = ret;
148                 ret = mark_fd_nonblocking(fd);
149                 if (ret < 0) {
150                         PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
151                                 url);
152                         close(fd);
153                         goto fail;
154                 }
155                 buddy = para_malloc(sizeof(*buddy));
156                 buddy->fd = fd;
157                 buddy->sbi = sbi + i;
158                 buddy->ping_received = false;
159                 para_list_add(&buddy->node, &ctx->buddies);
160
161                 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
162                 continue;
163 fail:
164                 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
165         }
166 }
167
168 /*
169  * Build an array of struct sync_buddy_info with one entry for each buddy given
170  * in the arguments. This array is not affected by sync_close(), so information
171  * stored there can be used for multiple instances (para_audiod). We store the
172  * resolved url and the ->disabled bit in this array.
173  */
174 static void *sync_setup(const struct lls_parse_result *lpr)
175 {
176         int i, ret;
177         unsigned n;
178         struct sync_buddy_info *sbi;
179         const struct lls_opt_result *r_b;
180
181         r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, lpr);
182         n = lls_opt_given(r_b);
183         sbi = para_malloc(n * sizeof(*sbi));
184         PARA_INFO_LOG("initializing buddy info array of length %u\n", n);
185         for (i = 0; i < n; i++) {
186                 const char *url = lls_string_val(i, r_b);
187                 size_t len = strlen(url);
188                 char *host = para_malloc(len + 1);
189                 int port;
190                 struct addrinfo *ai;
191
192                 if (!parse_url(url, host, len, &port)) {
193                         PARA_ERROR_LOG("could not parse url %s\n", url);
194                         exit(EXIT_FAILURE);
195                 }
196                 if (port < 0)
197                         port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, lpr);
198                 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
199                         host, port, &ai);
200                 if (ret < 0) {
201                         PARA_ERROR_LOG("host lookup failure for %s: %s\n",
202                                 url, para_strerror(-ret));
203                         exit(EXIT_FAILURE);
204                 }
205                 sbi[i].url = url;
206                 sbi[i].host = host;
207                 sbi[i].port = port;
208                 sbi[i].ai = ai;
209                 sbi[i].disabled = false;
210                 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
211         }
212         return sbi;
213 }
214
215 /*
216  * True if we sent a packet to all buddies and received a packet from each
217  * enabled buddy.
218  */
219 static bool sync_complete(struct sync_filter_context *ctx)
220 {
221         struct sync_buddy *buddy;
222
223         if (!ctx->ping_sent)
224                 return false;
225         FOR_EACH_BUDDY(buddy, &ctx->buddies)
226                 if (!buddy->sbi->disabled && !buddy->ping_received)
227                         return false;
228         return true;
229 }
230
231 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
232 {
233         struct sync_buddy *buddy;
234
235         FOR_EACH_BUDDY(buddy, &ctx->buddies) {
236                 if (buddy->sbi->disabled)
237                         continue;
238                 if (buddy->ping_received == true)
239                         continue;
240                 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
241                 buddy->sbi->disabled = true;
242         }
243 }
244
245 static void sync_set_timeout(struct sync_filter_context *ctx,
246                 struct lls_parse_result *lpr)
247 {
248         uint32_t ms = FILTER_CMD_OPT_UINT32_VAL(SYNC, TIMEOUT, lpr);
249         struct timeval to;
250
251         ms2tv(ms, &to);
252         tv_add(now, &to, &ctx->timeout);
253 }
254
255 static void sync_pre_select(struct sched *s, void *context)
256 {
257         int ret;
258         struct filter_node *fn = context;
259         struct sync_filter_context *ctx = fn->private_data;
260
261         if (list_empty(&ctx->buddies))
262                 return sched_min_delay(s);
263         if (ctx->listen_fd < 0)
264                 return sched_min_delay(s);
265         ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
266         if (ret < 0)
267                 return sched_min_delay(s);
268         para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
269         if (ret == 0)
270                 return;
271         if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
272                 sync_set_timeout(ctx, fn->lpr);
273                 return sched_min_delay(s);
274         }
275         if (sync_complete(ctx)) /* push down what we have */
276                 return sched_min_delay(s);
277         sched_request_barrier_or_min_delay(&ctx->timeout, s);
278 }
279
280 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
281                 struct list_head *list)
282 {
283         struct sync_buddy *buddy;
284
285         FOR_EACH_BUDDY(buddy, list)
286                 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
287                         return buddy;
288         return NULL;
289 }
290
291 static int sync_post_select(__a_unused struct sched *s, void *context)
292 {
293         int ret;
294         struct filter_node *fn = context;
295         struct sync_filter_context *ctx = fn->private_data;
296         struct sync_buddy *buddy, *tmp;
297
298         if (list_empty(&ctx->buddies))
299                 goto success;
300         ret = -E_SYNC_LISTEN_FD;
301         if (ctx->listen_fd < 0)
302                 goto fail;
303         ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
304         if (ret < 0)
305                 goto fail;
306         if (ret == 0)
307                 return 0;
308         if (ctx->timeout.tv_sec == 0)
309                 sync_set_timeout(ctx, fn->lpr);
310         else {
311                 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
312                         sync_disable_active_buddies(ctx);
313                         goto success;
314                 }
315         }
316         if (!ctx->ping_sent) {
317                 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
318                         char c = '\0';
319                         PARA_INFO_LOG("pinging %s (%s)\n",
320                                 buddy->sbi->url, buddy->sbi->disabled?
321                                 "disabled" : "enabled");
322                         ret = xwrite(buddy->fd, &c, 1);
323                         if (ret < 0) {
324                                 PARA_WARNING_LOG("failed to write to %s: %s\n",
325                                         buddy->sbi->url, para_strerror(-ret));
326                                 sync_close_buddy(buddy);
327                         }
328                 }
329                 ctx->ping_sent = true;
330         }
331         if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
332                 char c;
333                 for (;;) {
334                         struct sockaddr src_addr;
335                         socklen_t len = sizeof(src_addr);
336                         ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
337                                 &src_addr, &len);
338                         if (ret < 0) {
339                                 if (errno == EAGAIN || errno == EWOULDBLOCK)
340                                         break;
341                                 ret = -ERRNO_TO_PARA_ERROR(errno);
342                                 goto fail;
343                         }
344                         buddy = sync_find_buddy(&src_addr, &ctx->buddies);
345                         if (!buddy) {
346                                 PARA_NOTICE_LOG("pinged by unknown\n");
347                                 continue;
348                         }
349                         PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
350                         if (buddy->sbi->disabled) {
351                                 PARA_NOTICE_LOG("enabling %s\n",
352                                         buddy->sbi->url);
353                                 buddy->sbi->disabled = false;
354                         }
355                         buddy->ping_received = true;
356                 }
357         }
358         if (!sync_complete(ctx))
359                 return 1;
360         /*
361          * Although all enabled buddies are in sync we do not splice out
362          * ourselves immediately. We rather wait until the timeout expires,
363          * or the buddy list has become empty. This opens a time window
364          * for disabled buddies to become enabled by sending us a packet.
365          */
366         btr_pushdown(fn->btrn);
367         return 1;
368 success:
369         ret = -E_SYNC_COMPLETE; /* success */
370         goto out;
371 fail:
372         PARA_WARNING_LOG("%s\n", para_strerror(-ret));
373 out:
374         sync_close_buddies(ctx);
375         btr_splice_out_node(&fn->btrn);
376         assert(ret < 0);
377         return ret;
378 }
379
380 const struct filter lsg_filter_cmd_com_sync_user_data = {
381         .setup = sync_setup,
382         .open = sync_open,
383         .pre_select = sync_pre_select,
384         .post_select = sync_post_select,
385         .close = sync_close,
386         .teardown = sync_teardown
387 };