gui: Reset command buf offset on errors.
[paraslash.git] / sync_filter.c
1 /*
2  * Copyright (C) 2013 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file sync_filter.c Playback synchronization filter. */
8
9 #include <netinet/in.h>
10 #include <sys/socket.h>
11 #include <regex.h>
12 #include <sys/types.h>
13 #include <arpa/inet.h>
14 #include <sys/un.h>
15 #include <netdb.h>
16
17 #include "para.h"
18 #include "sync_filter.cmdline.h"
19 #include "list.h"
20 #include "net.h"
21 #include "sched.h"
22 #include "ggo.h"
23 #include "buffer_tree.h"
24 #include "filter.h"
25 #include "string.h"
26 #include "fd.h"
27 #include "error.h"
28
29 struct sync_buddy_info {
30         const char *url;
31         char *host;
32         int port;
33         struct addrinfo *ai;
34         bool disabled;
35 };
36
37 /* per open/close data */
38 struct sync_buddy {
39         int fd;
40         struct sync_buddy_info *sbi;
41         bool ping_received;
42         struct list_head node;
43 };
44
45 struct sync_filter_context {
46         int listen_fd;
47         struct list_head buddies;
48         struct timeval timeout;
49         bool ping_sent;
50 };
51
52 struct sync_filter_config {
53         struct sync_filter_args_info *conf;
54         struct sync_buddy_info *buddy_info;
55 };
56
57 #define FOR_EACH_BUDDY(_buddy, _list) \
58         list_for_each_entry(_buddy, _list, node)
59 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
60         list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
61
62 static void sync_close_buddy(struct sync_buddy *buddy)
63 {
64         if (buddy->fd < 0)
65                 return;
66         PARA_DEBUG_LOG("closing %s\n", buddy->sbi->url);
67         close(buddy->fd);
68         buddy->fd = -1;
69 }
70
71 static void sync_close_buddies(struct sync_filter_context *ctx)
72 {
73         struct sync_buddy *buddy;
74
75         FOR_EACH_BUDDY(buddy, &ctx->buddies)
76                 sync_close_buddy(buddy);
77 }
78
79 static void sync_close(struct filter_node *fn)
80 {
81         struct sync_filter_context *ctx = fn->private_data;
82
83         sync_close_buddies(ctx);
84         if (ctx->listen_fd >= 0) {
85                 close(ctx->listen_fd);
86                 ctx->listen_fd = -1;
87         }
88         free(ctx);
89         fn->private_data = NULL;
90 }
91
92 static void sync_free_config(void *conf)
93 {
94         struct sync_filter_config *sfc = conf;
95         int i;
96
97         for (i = 0; i < sfc->conf->buddy_given; i++) {
98                 free(sfc->buddy_info[i].host);
99                 freeaddrinfo(sfc->buddy_info[i].ai);
100         }
101         sync_filter_cmdline_parser_free(sfc->conf);
102         free(sfc);
103 }
104
105 static void sync_open(struct filter_node *fn)
106 {
107         int i, ret;
108         struct sync_filter_config *sfc = fn->conf;
109         struct sync_buddy *buddy;
110         struct sync_filter_context *ctx;
111
112         assert(sfc);
113
114         ctx = fn->private_data = para_calloc(sizeof(*ctx));
115         INIT_LIST_HEAD(&ctx->buddies);
116         ctx->listen_fd = -1;
117
118         /* create socket to listen for incoming packets */
119         ret = makesock(
120                 IPPROTO_UDP,
121                 true /* passive */,
122                 NULL /* no host required */,
123                 sfc->conf->port_arg,
124                 NULL /* no flowopts */
125         );
126         if (ret < 0) {
127                 PARA_ERROR_LOG("could not create UDP listening socket %d\n",
128                         sfc->conf->port_arg);
129                 return;
130         }
131         ctx->listen_fd = ret;
132         PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
133
134         for (i = 0; i < sfc->conf->buddy_given; i++) {
135                 struct sync_buddy_info *sbi = sfc->buddy_info + i;
136                 const char *url = sfc->conf->buddy_arg[i];
137                 int fd;
138
139                 /* make buddy udp socket from address info */
140                 assert(sbi->ai);
141                 ret = makesock_addrinfo(
142                         IPPROTO_UDP,
143                         false /* not passive */,
144                         sbi->ai,
145                         NULL /* no flowopts */
146                 );
147                 if (ret < 0) {
148                         PARA_WARNING_LOG("could not make socket for %s\n",
149                                 url);
150                         goto fail;
151                 }
152                 fd = ret;
153                 ret = mark_fd_nonblocking(fd);
154                 if (ret < 0) {
155                         PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
156                                 url);
157                         close(fd);
158                         goto fail;
159                 }
160                 buddy = para_malloc(sizeof(*buddy));
161                 buddy->fd = fd;
162                 buddy->sbi = sbi;
163                 buddy->ping_received = false;
164                 para_list_add(&buddy->node, &ctx->buddies);
165
166                 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
167                 continue;
168 fail:
169                 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
170         }
171 }
172
173 static int sync_parse_config(int argc, char **argv, void **result)
174 {
175         int i, ret, n;
176         struct sync_filter_config *sfc;
177         struct sync_filter_args_info *conf = para_malloc(sizeof(*conf));
178
179         sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */
180         sfc = para_calloc(sizeof(*sfc));
181         sfc->conf = conf;
182         n = conf->buddy_given;
183         sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info));
184         PARA_INFO_LOG("initializing buddy info array of length %d\n", n);
185         for (i = 0; i < n; i++) {
186                 const char *url = conf->buddy_arg[i];
187                 size_t len = strlen(url);
188                 char *host = para_malloc(len + 1);
189                 int port;
190                 struct addrinfo *ai;
191                 struct sync_buddy_info *sbi = sfc->buddy_info + i;
192
193                 if (!parse_url(url, host, len, &port)) {
194                         free(host);
195                         PARA_ERROR_LOG("could not parse url %s\n", url);
196                         ret = -ERRNO_TO_PARA_ERROR(EINVAL);
197                         goto fail;
198                 }
199                 if (port < 0)
200                         port = conf->port_arg;
201                 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
202                         host, port, &ai);
203                 if (ret < 0) {
204                         PARA_ERROR_LOG("host lookup failure for %s\n", url);
205                         free(host);
206                         goto fail;
207                 }
208                 sbi->url = url;
209                 sbi->host = host;
210                 sbi->port = port;
211                 sbi->ai = ai;
212                 sbi->disabled = false;
213                 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
214         }
215         *result = sfc;
216         return 1;
217 fail:
218         assert(ret < 0);
219         PARA_ERROR_LOG("%s\n", para_strerror(-ret));
220         sync_free_config(sfc);
221         return ret;
222 }
223
224 /*
225  * True if we sent a packet to all budies and received a packet from each
226  * enabled buddy.
227  */
228 static bool sync_complete(struct sync_filter_context *ctx)
229 {
230         struct sync_buddy *buddy;
231
232         if (!ctx->ping_sent)
233                 return false;
234         FOR_EACH_BUDDY(buddy, &ctx->buddies)
235                 if (!buddy->sbi->disabled && !buddy->ping_received)
236                         return false;
237         return true;
238 }
239
240 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
241 {
242         struct sync_buddy *buddy;
243
244         FOR_EACH_BUDDY(buddy, &ctx->buddies) {
245                 if (buddy->sbi->disabled)
246                         continue;
247                 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
248                 buddy->sbi->disabled = true;
249         }
250 }
251
252 static void sync_set_timeout(struct sync_filter_context *ctx,
253                 struct sync_filter_config *sfc)
254 {
255         struct timeval to;
256
257         ms2tv(sfc->conf->timeout_arg, &to);
258         tv_add(now, &to, &ctx->timeout);
259 }
260
261 static void sync_pre_select(struct sched *s, struct task *t)
262 {
263         int ret;
264         struct filter_node *fn = container_of(t, struct filter_node, task);
265         struct sync_filter_context *ctx = fn->private_data;
266         struct sync_filter_config *sfc = fn->conf;
267
268         if (list_empty(&ctx->buddies))
269                 return sched_min_delay(s);
270         if (ctx->listen_fd < 0)
271                 return sched_min_delay(s);
272         ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
273         if (ret < 0)
274                 return sched_min_delay(s);
275         para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
276         if (ret == 0)
277                 return;
278         if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
279                 sync_set_timeout(ctx, sfc);
280                 return sched_min_delay(s);
281         }
282         if (sync_complete(ctx)) /* push down what we have */
283                 return sched_min_delay(s);
284         sched_request_barrier_or_min_delay(&ctx->timeout, s);
285 }
286
287 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
288                 struct list_head *list)
289 {
290         struct sync_buddy *buddy;
291
292         FOR_EACH_BUDDY(buddy, list)
293                 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
294                         return buddy;
295         return NULL;
296 }
297
298 static int sync_post_select(__a_unused struct sched *s, struct task *t)
299 {
300         int ret;
301         struct filter_node *fn = container_of(t, struct filter_node, task);
302         struct sync_filter_context *ctx = fn->private_data;
303         struct sync_filter_config *sfc = fn->conf;
304         struct sync_buddy *buddy, *tmp;
305
306         if (list_empty(&ctx->buddies))
307                 goto success;
308         ret = -E_SYNC_LISTEN_FD;
309         if (ctx->listen_fd < 0)
310                 goto fail;
311         ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
312         if (ret < 0)
313                 goto fail;
314         if (ret == 0)
315                 return 0;
316         if (ctx->timeout.tv_sec == 0)
317                 sync_set_timeout(ctx, sfc);
318         else {
319                 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
320                         sync_disable_active_buddies(ctx);
321                         goto success;
322                 }
323         }
324         if (!ctx->ping_sent) {
325                 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
326                         char c = '\0';
327                         PARA_INFO_LOG("pinging %s (%s)\n",
328                                 buddy->sbi->url, buddy->sbi->disabled?
329                                 "disabled" : "enabled");
330                         ret = xwrite(buddy->fd, &c, 1);
331                         sync_close_buddy(buddy);
332                         if (ret < 0) {
333                                 PARA_WARNING_LOG("failed to write to %s: %s\n",
334                                         buddy->sbi->url, para_strerror(-ret));
335                                 list_del(&buddy->node);
336                         }
337                 }
338                 ctx->ping_sent = true;
339         }
340         if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
341                 char c;
342                 for (;;) {
343                         struct sockaddr src_addr;
344                         socklen_t len = sizeof(src_addr);
345                         ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
346                                 &src_addr, &len);
347                         if (ret < 0) {
348                                 if (errno == EAGAIN || errno == EWOULDBLOCK)
349                                         break;
350                                 ret = -ERRNO_TO_PARA_ERROR(errno);
351                                 goto fail;
352                         }
353                         buddy = sync_find_buddy(&src_addr, &ctx->buddies);
354                         if (!buddy) {
355                                 PARA_NOTICE_LOG("pinged by unknown\n");
356                                 continue;
357                         }
358                         PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
359                         if (buddy->sbi->disabled) {
360                                 PARA_NOTICE_LOG("enabling %s\n",
361                                         buddy->sbi->url);
362                                 buddy->sbi->disabled = false;
363                         }
364                         list_del(&buddy->node);
365                 }
366         }
367         if (!sync_complete(ctx))
368                 return 1;
369         /*
370          * Although all enabled buddies are in sync we do not splice out
371          * ourselves immediately. We rather wait until the timout expires,
372          * or the buddy list has become empty. This opens a time window
373          * for disabled buddies to become enabled by sending us a packet.
374          */
375         btr_pushdown(fn->btrn);
376         return 1;
377 success:
378         ret = -E_SYNC_COMPLETE; /* success */
379         goto out;
380 fail:
381         PARA_WARNING_LOG("%s\n", para_strerror(-ret));
382 out:
383         sync_close_buddies(ctx);
384         btr_splice_out_node(&fn->btrn);
385         assert(ret < 0);
386         return ret;
387 }
388
389 /**
390  * The synchronization filter.
391  *
392  * \param f Pointer to the struct to initialize.
393  */
394 void sync_filter_init(struct filter *f)
395 {
396         struct sync_filter_args_info dummy;
397
398         sync_filter_cmdline_parser_init(&dummy);
399         f->open = sync_open;
400         f->close = sync_close;
401         f->pre_select = sync_pre_select;
402         f->post_select = sync_post_select;
403         f->parse_config = sync_parse_config;
404         f->free_config = sync_free_config;
405         f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter);
406 }