server: Don't pass peername to handle_connect().
[paraslash.git] / sync_filter.c
1 /* Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
2
3 /** \file sync_filter.c Playback synchronization filter. */
4
5 #include <netinet/in.h>
6 #include <sys/socket.h>
7 #include <regex.h>
8 #include <sys/types.h>
9 #include <arpa/inet.h>
10 #include <sys/un.h>
11 #include <netdb.h>
12 #include <lopsub.h>
13
14 #include "filter_cmd.lsg.h"
15 #include "para.h"
16 #include "list.h"
17 #include "net.h"
18 #include "sched.h"
19 #include "buffer_tree.h"
20 #include "filter.h"
21 #include "string.h"
22 #include "fd.h"
23 #include "error.h"
24
25 struct sync_buddy_info {
26         const char *url;
27         char *host;
28         int port;
29         struct addrinfo *ai;
30         bool disabled;
31 };
32
33 /* One active buddy */
34 struct sync_buddy {
35         int fd;
36         struct sync_buddy_info *sbi;
37         bool ping_received;
38         struct list_head node;
39 };
40
41 /* Allocated in ->open(), stored in fn->private_data */
42 struct sync_filter_context {
43         int listen_fd;
44         struct list_head buddies;
45         struct timeval timeout;
46         bool ping_sent;
47 };
48
49 #define FOR_EACH_BUDDY(_buddy, _list) \
50         list_for_each_entry(_buddy, _list, node)
51 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
52         list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
53
54 static void sync_close_buddy(struct sync_buddy *buddy)
55 {
56         PARA_DEBUG_LOG("closing %s, fd %d\n", buddy->sbi->url, buddy->fd);
57         close(buddy->fd);
58         list_del(&buddy->node);
59         free(buddy);
60 }
61
62 static void sync_close_buddies(struct sync_filter_context *ctx)
63 {
64         struct sync_buddy *buddy, *tmp;
65
66         FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies)
67                 sync_close_buddy(buddy);
68 }
69
70 static void sync_close(struct filter_node *fn)
71 {
72         struct sync_filter_context *ctx = fn->private_data;
73
74         sync_close_buddies(ctx);
75         if (ctx->listen_fd >= 0) {
76                 close(ctx->listen_fd);
77                 ctx->listen_fd = -1;
78         }
79         free(ctx);
80         fn->private_data = NULL;
81 }
82
83 static void sync_teardown(const struct lls_parse_result *lpr, void *conf)
84 {
85         struct sync_buddy_info *sbi = conf;
86         int i, num_buddies = FILTER_CMD_OPT_GIVEN(SYNC, BUDDY, lpr);
87
88         for (i = 0; i < num_buddies; i++) {
89                 free(sbi[i].host);
90                 freeaddrinfo(sbi[i].ai);
91         }
92         free(sbi);
93 }
94
95 static void sync_open(struct filter_node *fn)
96 {
97         int i, ret;
98         struct sync_buddy *buddy;
99         struct sync_filter_context *ctx;
100         struct sync_buddy_info *sbi = fn->conf;
101         uint32_t port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, fn->lpr);
102         unsigned buddy_given;
103         const struct lls_opt_result *r_b;
104
105         ctx = fn->private_data = para_calloc(sizeof(*ctx));
106         INIT_LIST_HEAD(&ctx->buddies);
107
108         /* create socket to listen for incoming packets */
109         ret = makesock(
110                 IPPROTO_UDP,
111                 true /* passive */,
112                 NULL /* no host required */,
113                 port,
114                 NULL /* no flowopts */
115         );
116         if (ret < 0) {
117                 PARA_ERROR_LOG("could not create UDP listening socket %u\n",
118                         port);
119                 return;
120         }
121         ctx->listen_fd = ret;
122         PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
123
124         r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, fn->lpr);
125         buddy_given = lls_opt_given(r_b);
126         for (i = 0; i < buddy_given; i++) {
127                 int fd;
128                 const char *url = lls_string_val(i, r_b);
129
130                 /* make buddy udp socket from address info */
131                 assert(sbi->ai);
132                 ret = makesock_addrinfo(
133                         IPPROTO_UDP,
134                         false /* not passive */,
135                         sbi[i].ai,
136                         NULL /* no flowopts */
137                 );
138                 if (ret < 0) {
139                         PARA_WARNING_LOG("could not make socket for %s\n",
140                                 url);
141                         goto fail;
142                 }
143                 fd = ret;
144                 ret = mark_fd_nonblocking(fd);
145                 if (ret < 0) {
146                         PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
147                                 url);
148                         close(fd);
149                         goto fail;
150                 }
151                 buddy = para_malloc(sizeof(*buddy));
152                 buddy->fd = fd;
153                 buddy->sbi = sbi + i;
154                 buddy->ping_received = false;
155                 para_list_add(&buddy->node, &ctx->buddies);
156
157                 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
158                 continue;
159 fail:
160                 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
161         }
162 }
163
164 /*
165  * Build an array of struct sync_buddy_info with one entry for each buddy given
166  * in the arguments. This array is not affected by sync_close(), so information
167  * stored there can be used for multiple instances (para_audiod). We store the
168  * resolved url and the ->disabled bit in this array.
169  */
170 static void *sync_setup(const struct lls_parse_result *lpr)
171 {
172         int i, ret;
173         unsigned n;
174         struct sync_buddy_info *sbi;
175         const struct lls_opt_result *r_b;
176
177         r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, lpr);
178         n = lls_opt_given(r_b);
179         sbi = para_malloc(n * sizeof(*sbi));
180         PARA_INFO_LOG("initializing buddy info array of length %u\n", n);
181         for (i = 0; i < n; i++) {
182                 const char *url = lls_string_val(i, r_b);
183                 size_t len = strlen(url);
184                 char *host = para_malloc(len + 1);
185                 int port;
186                 struct addrinfo *ai;
187
188                 if (!parse_url(url, host, len, &port)) {
189                         PARA_ERROR_LOG("could not parse url %s\n", url);
190                         exit(EXIT_FAILURE);
191                 }
192                 if (port < 0)
193                         port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, lpr);
194                 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
195                         host, port, &ai);
196                 if (ret < 0) {
197                         PARA_ERROR_LOG("host lookup failure for %s: %s\n",
198                                 url, para_strerror(-ret));
199                         exit(EXIT_FAILURE);
200                 }
201                 sbi[i].url = url;
202                 sbi[i].host = host;
203                 sbi[i].port = port;
204                 sbi[i].ai = ai;
205                 sbi[i].disabled = false;
206                 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
207         }
208         return sbi;
209 }
210
211 /*
212  * True if we sent a packet to all buddies and received a packet from each
213  * enabled buddy.
214  */
215 static bool sync_complete(struct sync_filter_context *ctx)
216 {
217         struct sync_buddy *buddy;
218
219         if (!ctx->ping_sent)
220                 return false;
221         FOR_EACH_BUDDY(buddy, &ctx->buddies)
222                 if (!buddy->sbi->disabled && !buddy->ping_received)
223                         return false;
224         return true;
225 }
226
227 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
228 {
229         struct sync_buddy *buddy;
230
231         FOR_EACH_BUDDY(buddy, &ctx->buddies) {
232                 if (buddy->sbi->disabled)
233                         continue;
234                 if (buddy->ping_received == true)
235                         continue;
236                 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
237                 buddy->sbi->disabled = true;
238         }
239 }
240
241 static void sync_set_timeout(struct sync_filter_context *ctx,
242                 struct lls_parse_result *lpr)
243 {
244         uint32_t ms = FILTER_CMD_OPT_UINT32_VAL(SYNC, TIMEOUT, lpr);
245         struct timeval to;
246
247         ms2tv(ms, &to);
248         tv_add(now, &to, &ctx->timeout);
249 }
250
251 static void sync_pre_select(struct sched *s, void *context)
252 {
253         int ret;
254         struct filter_node *fn = context;
255         struct sync_filter_context *ctx = fn->private_data;
256
257         if (list_empty(&ctx->buddies))
258                 return sched_min_delay(s);
259         if (ctx->listen_fd < 0)
260                 return sched_min_delay(s);
261         ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
262         if (ret < 0)
263                 return sched_min_delay(s);
264         para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
265         if (ret == 0)
266                 return;
267         if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
268                 sync_set_timeout(ctx, fn->lpr);
269                 return sched_min_delay(s);
270         }
271         if (sync_complete(ctx)) /* push down what we have */
272                 return sched_min_delay(s);
273         sched_request_barrier_or_min_delay(&ctx->timeout, s);
274 }
275
276 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
277                 struct list_head *list)
278 {
279         struct sync_buddy *buddy;
280
281         FOR_EACH_BUDDY(buddy, list)
282                 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
283                         return buddy;
284         return NULL;
285 }
286
287 static int sync_post_select(__a_unused struct sched *s, void *context)
288 {
289         int ret;
290         struct filter_node *fn = context;
291         struct sync_filter_context *ctx = fn->private_data;
292         struct sync_buddy *buddy, *tmp;
293
294         if (list_empty(&ctx->buddies))
295                 goto success;
296         ret = -E_SYNC_LISTEN_FD;
297         if (ctx->listen_fd < 0)
298                 goto fail;
299         ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
300         if (ret < 0)
301                 goto fail;
302         if (ret == 0)
303                 return 0;
304         if (ctx->timeout.tv_sec == 0)
305                 sync_set_timeout(ctx, fn->lpr);
306         else {
307                 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
308                         sync_disable_active_buddies(ctx);
309                         goto success;
310                 }
311         }
312         if (!ctx->ping_sent) {
313                 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
314                         char c = '\0';
315                         PARA_INFO_LOG("pinging %s (%s)\n",
316                                 buddy->sbi->url, buddy->sbi->disabled?
317                                 "disabled" : "enabled");
318                         ret = xwrite(buddy->fd, &c, 1);
319                         if (ret < 0) {
320                                 PARA_WARNING_LOG("failed to write to %s: %s\n",
321                                         buddy->sbi->url, para_strerror(-ret));
322                                 sync_close_buddy(buddy);
323                         }
324                 }
325                 ctx->ping_sent = true;
326         }
327         if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
328                 char c;
329                 for (;;) {
330                         struct sockaddr src_addr;
331                         socklen_t len = sizeof(src_addr);
332                         ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
333                                 &src_addr, &len);
334                         if (ret < 0) {
335                                 if (errno == EAGAIN || errno == EWOULDBLOCK)
336                                         break;
337                                 ret = -ERRNO_TO_PARA_ERROR(errno);
338                                 goto fail;
339                         }
340                         buddy = sync_find_buddy(&src_addr, &ctx->buddies);
341                         if (!buddy) {
342                                 PARA_NOTICE_LOG("pinged by unknown\n");
343                                 continue;
344                         }
345                         PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
346                         if (buddy->sbi->disabled) {
347                                 PARA_NOTICE_LOG("enabling %s\n",
348                                         buddy->sbi->url);
349                                 buddy->sbi->disabled = false;
350                         }
351                         buddy->ping_received = true;
352                 }
353         }
354         if (!sync_complete(ctx))
355                 return 1;
356         /*
357          * Although all enabled buddies are in sync we do not splice out
358          * ourselves immediately. We rather wait until the timeout expires,
359          * or the buddy list has become empty. This opens a time window
360          * for disabled buddies to become enabled by sending us a packet.
361          */
362         btr_pushdown(fn->btrn);
363         return 1;
364 success:
365         ret = -E_SYNC_COMPLETE; /* success */
366         goto out;
367 fail:
368         PARA_WARNING_LOG("%s\n", para_strerror(-ret));
369 out:
370         sync_close_buddies(ctx);
371         btr_splice_out_node(&fn->btrn);
372         assert(ret < 0);
373         return ret;
374 }
375
376 const struct filter lsg_filter_cmd_com_sync_user_data = {
377         .setup = sync_setup,
378         .open = sync_open,
379         .pre_select = sync_pre_select,
380         .post_select = sync_post_select,
381         .close = sync_close,
382         .teardown = sync_teardown
383 };