Assorted typo fixes in comments.
[paraslash.git] / sync_filter.c
1 /*
2 * Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6
7 /** \file sync_filter.c Playback synchronization filter. */
8
9 #include <netinet/in.h>
10 #include <sys/socket.h>
11 #include <regex.h>
12 #include <sys/types.h>
13 #include <arpa/inet.h>
14 #include <sys/un.h>
15 #include <netdb.h>
16
17 #include "para.h"
18 #include "sync_filter.cmdline.h"
19 #include "list.h"
20 #include "net.h"
21 #include "sched.h"
22 #include "ggo.h"
23 #include "buffer_tree.h"
24 #include "filter.h"
25 #include "string.h"
26 #include "fd.h"
27 #include "error.h"
28
29 struct sync_buddy_info {
30 const char *url;
31 char *host;
32 int port;
33 struct addrinfo *ai;
34 bool disabled;
35 };
36
37 /* per open/close data */
38 struct sync_buddy {
39 int fd;
40 struct sync_buddy_info *sbi;
41 bool ping_received;
42 struct list_head node;
43 };
44
45 struct sync_filter_context {
46 int listen_fd;
47 struct list_head buddies;
48 struct timeval timeout;
49 bool ping_sent;
50 };
51
52 struct sync_filter_config {
53 struct sync_filter_args_info *conf;
54 struct sync_buddy_info *buddy_info;
55 };
56
57 #define FOR_EACH_BUDDY(_buddy, _list) \
58 list_for_each_entry(_buddy, _list, node)
59 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
60 list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
61
62 static void sync_close_buddy(struct sync_buddy *buddy)
63 {
64 PARA_DEBUG_LOG("closing %s, fd %d\n", buddy->sbi->url, buddy->fd);
65 close(buddy->fd);
66 list_del(&buddy->node);
67 free(buddy);
68 }
69
70 static void sync_close_buddies(struct sync_filter_context *ctx)
71 {
72 struct sync_buddy *buddy, *tmp;
73
74 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies)
75 sync_close_buddy(buddy);
76 }
77
78 static void sync_close(struct filter_node *fn)
79 {
80 struct sync_filter_context *ctx = fn->private_data;
81
82 sync_close_buddies(ctx);
83 if (ctx->listen_fd >= 0) {
84 close(ctx->listen_fd);
85 ctx->listen_fd = -1;
86 }
87 free(ctx);
88 fn->private_data = NULL;
89 }
90
91 static void sync_free_config(void *conf)
92 {
93 struct sync_filter_config *sfc = conf;
94 int i;
95
96 for (i = 0; i < sfc->conf->buddy_given; i++) {
97 free(sfc->buddy_info[i].host);
98 freeaddrinfo(sfc->buddy_info[i].ai);
99 }
100 sync_filter_cmdline_parser_free(sfc->conf);
101 free(sfc);
102 }
103
104 static void sync_open(struct filter_node *fn)
105 {
106 int i, ret;
107 struct sync_filter_config *sfc = fn->conf;
108 struct sync_buddy *buddy;
109 struct sync_filter_context *ctx;
110
111 assert(sfc);
112
113 ctx = fn->private_data = para_calloc(sizeof(*ctx));
114 INIT_LIST_HEAD(&ctx->buddies);
115 ctx->listen_fd = -1;
116
117 /* create socket to listen for incoming packets */
118 ret = makesock(
119 IPPROTO_UDP,
120 true /* passive */,
121 NULL /* no host required */,
122 sfc->conf->port_arg,
123 NULL /* no flowopts */
124 );
125 if (ret < 0) {
126 PARA_ERROR_LOG("could not create UDP listening socket %d\n",
127 sfc->conf->port_arg);
128 return;
129 }
130 ctx->listen_fd = ret;
131 PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
132
133 for (i = 0; i < sfc->conf->buddy_given; i++) {
134 struct sync_buddy_info *sbi = sfc->buddy_info + i;
135 const char *url = sfc->conf->buddy_arg[i];
136 int fd;
137
138 /* make buddy udp socket from address info */
139 assert(sbi->ai);
140 ret = makesock_addrinfo(
141 IPPROTO_UDP,
142 false /* not passive */,
143 sbi->ai,
144 NULL /* no flowopts */
145 );
146 if (ret < 0) {
147 PARA_WARNING_LOG("could not make socket for %s\n",
148 url);
149 goto fail;
150 }
151 fd = ret;
152 ret = mark_fd_nonblocking(fd);
153 if (ret < 0) {
154 PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
155 url);
156 close(fd);
157 goto fail;
158 }
159 buddy = para_malloc(sizeof(*buddy));
160 buddy->fd = fd;
161 buddy->sbi = sbi;
162 buddy->ping_received = false;
163 para_list_add(&buddy->node, &ctx->buddies);
164
165 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
166 continue;
167 fail:
168 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
169 }
170 }
171
172 static int sync_parse_config(int argc, char **argv, void **result)
173 {
174 int i, ret, n;
175 struct sync_filter_config *sfc;
176 struct sync_filter_args_info *conf = para_malloc(sizeof(*conf));
177
178 sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */
179 sfc = para_calloc(sizeof(*sfc));
180 sfc->conf = conf;
181 n = conf->buddy_given;
182 sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info));
183 PARA_INFO_LOG("initializing buddy info array of length %d\n", n);
184 for (i = 0; i < n; i++) {
185 const char *url = conf->buddy_arg[i];
186 size_t len = strlen(url);
187 char *host = para_malloc(len + 1);
188 int port;
189 struct addrinfo *ai;
190 struct sync_buddy_info *sbi = sfc->buddy_info + i;
191
192 if (!parse_url(url, host, len, &port)) {
193 free(host);
194 PARA_ERROR_LOG("could not parse url %s\n", url);
195 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
196 goto fail;
197 }
198 if (port < 0)
199 port = conf->port_arg;
200 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
201 host, port, &ai);
202 if (ret < 0) {
203 PARA_ERROR_LOG("host lookup failure for %s\n", url);
204 free(host);
205 goto fail;
206 }
207 sbi->url = url;
208 sbi->host = host;
209 sbi->port = port;
210 sbi->ai = ai;
211 sbi->disabled = false;
212 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
213 }
214 *result = sfc;
215 return 1;
216 fail:
217 assert(ret < 0);
218 PARA_ERROR_LOG("%s\n", para_strerror(-ret));
219 sync_free_config(sfc);
220 return ret;
221 }
222
223 /*
224 * True if we sent a packet to all buddies and received a packet from each
225 * enabled buddy.
226 */
227 static bool sync_complete(struct sync_filter_context *ctx)
228 {
229 struct sync_buddy *buddy;
230
231 if (!ctx->ping_sent)
232 return false;
233 FOR_EACH_BUDDY(buddy, &ctx->buddies)
234 if (!buddy->sbi->disabled && !buddy->ping_received)
235 return false;
236 return true;
237 }
238
239 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
240 {
241 struct sync_buddy *buddy;
242
243 FOR_EACH_BUDDY(buddy, &ctx->buddies) {
244 if (buddy->sbi->disabled)
245 continue;
246 if (buddy->ping_received == true)
247 continue;
248 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
249 buddy->sbi->disabled = true;
250 }
251 }
252
253 static void sync_set_timeout(struct sync_filter_context *ctx,
254 struct sync_filter_config *sfc)
255 {
256 struct timeval to;
257
258 ms2tv(sfc->conf->timeout_arg, &to);
259 tv_add(now, &to, &ctx->timeout);
260 }
261
262 static void sync_pre_select(struct sched *s, void *context)
263 {
264 int ret;
265 struct filter_node *fn = context;
266 struct sync_filter_context *ctx = fn->private_data;
267 struct sync_filter_config *sfc = fn->conf;
268
269 if (list_empty(&ctx->buddies))
270 return sched_min_delay(s);
271 if (ctx->listen_fd < 0)
272 return sched_min_delay(s);
273 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
274 if (ret < 0)
275 return sched_min_delay(s);
276 para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
277 if (ret == 0)
278 return;
279 if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
280 sync_set_timeout(ctx, sfc);
281 return sched_min_delay(s);
282 }
283 if (sync_complete(ctx)) /* push down what we have */
284 return sched_min_delay(s);
285 sched_request_barrier_or_min_delay(&ctx->timeout, s);
286 }
287
288 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
289 struct list_head *list)
290 {
291 struct sync_buddy *buddy;
292
293 FOR_EACH_BUDDY(buddy, list)
294 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
295 return buddy;
296 return NULL;
297 }
298
299 static int sync_post_select(__a_unused struct sched *s, void *context)
300 {
301 int ret;
302 struct filter_node *fn = context;
303 struct sync_filter_context *ctx = fn->private_data;
304 struct sync_filter_config *sfc = fn->conf;
305 struct sync_buddy *buddy, *tmp;
306
307 if (list_empty(&ctx->buddies))
308 goto success;
309 ret = -E_SYNC_LISTEN_FD;
310 if (ctx->listen_fd < 0)
311 goto fail;
312 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
313 if (ret < 0)
314 goto fail;
315 if (ret == 0)
316 return 0;
317 if (ctx->timeout.tv_sec == 0)
318 sync_set_timeout(ctx, sfc);
319 else {
320 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
321 sync_disable_active_buddies(ctx);
322 goto success;
323 }
324 }
325 if (!ctx->ping_sent) {
326 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
327 char c = '\0';
328 PARA_INFO_LOG("pinging %s (%s)\n",
329 buddy->sbi->url, buddy->sbi->disabled?
330 "disabled" : "enabled");
331 ret = xwrite(buddy->fd, &c, 1);
332 if (ret < 0) {
333 PARA_WARNING_LOG("failed to write to %s: %s\n",
334 buddy->sbi->url, para_strerror(-ret));
335 sync_close_buddy(buddy);
336 }
337 }
338 ctx->ping_sent = true;
339 }
340 if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
341 char c;
342 for (;;) {
343 struct sockaddr src_addr;
344 socklen_t len = sizeof(src_addr);
345 ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
346 &src_addr, &len);
347 if (ret < 0) {
348 if (errno == EAGAIN || errno == EWOULDBLOCK)
349 break;
350 ret = -ERRNO_TO_PARA_ERROR(errno);
351 goto fail;
352 }
353 buddy = sync_find_buddy(&src_addr, &ctx->buddies);
354 if (!buddy) {
355 PARA_NOTICE_LOG("pinged by unknown\n");
356 continue;
357 }
358 PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
359 if (buddy->sbi->disabled) {
360 PARA_NOTICE_LOG("enabling %s\n",
361 buddy->sbi->url);
362 buddy->sbi->disabled = false;
363 }
364 buddy->ping_received = true;
365 }
366 }
367 if (!sync_complete(ctx))
368 return 1;
369 /*
370 * Although all enabled buddies are in sync we do not splice out
371 * ourselves immediately. We rather wait until the timeout expires,
372 * or the buddy list has become empty. This opens a time window
373 * for disabled buddies to become enabled by sending us a packet.
374 */
375 btr_pushdown(fn->btrn);
376 return 1;
377 success:
378 ret = -E_SYNC_COMPLETE; /* success */
379 goto out;
380 fail:
381 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
382 out:
383 sync_close_buddies(ctx);
384 btr_splice_out_node(&fn->btrn);
385 assert(ret < 0);
386 return ret;
387 }
388
389 /**
390 * The synchronization filter.
391 *
392 * \param f Pointer to the struct to initialize.
393 */
394 void sync_filter_init(struct filter *f)
395 {
396 struct sync_filter_args_info dummy;
397
398 sync_filter_cmdline_parser_init(&dummy);
399 f->open = sync_open;
400 f->close = sync_close;
401 f->pre_select = sync_pre_select;
402 f->post_select = sync_post_select;
403 f->parse_config = sync_parse_config;
404 f->free_config = sync_free_config;
405 f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter);
406 }