2 * Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>
4 * Licensed under the GPL v2. For licencing details see COPYING.
7 /** \file sync_filter.c Playback synchronization filter. */
9 #include <netinet/in.h>
10 #include <sys/socket.h>
12 #include <sys/types.h>
13 #include <arpa/inet.h>
18 #include "sync_filter.cmdline.h"
23 #include "buffer_tree.h"
29 struct sync_buddy_info {
37 /* per open/close data */
40 struct sync_buddy_info *sbi;
42 struct list_head node;
45 struct sync_filter_context {
47 struct list_head buddies;
48 struct timeval timeout;
52 struct sync_filter_config {
53 struct sync_filter_args_info *conf;
54 struct sync_buddy_info *buddy_info;
57 #define FOR_EACH_BUDDY(_buddy, _list) \
58 list_for_each_entry(_buddy, _list, node)
59 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
60 list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
62 static void sync_close_buddy(struct sync_buddy *buddy)
66 PARA_DEBUG_LOG("closing %s\n", buddy->sbi->url);
71 static void sync_close_buddies(struct sync_filter_context *ctx)
73 struct sync_buddy *buddy;
75 FOR_EACH_BUDDY(buddy, &ctx->buddies)
76 sync_close_buddy(buddy);
79 static void sync_close(struct filter_node *fn)
81 struct sync_filter_context *ctx = fn->private_data;
83 sync_close_buddies(ctx);
84 if (ctx->listen_fd >= 0) {
85 close(ctx->listen_fd);
89 fn->private_data = NULL;
92 static void sync_free_config(void *conf)
94 struct sync_filter_config *sfc = conf;
97 for (i = 0; i < sfc->conf->buddy_given; i++) {
98 free(sfc->buddy_info[i].host);
99 freeaddrinfo(sfc->buddy_info[i].ai);
101 sync_filter_cmdline_parser_free(sfc->conf);
105 static void sync_open(struct filter_node *fn)
108 struct sync_filter_config *sfc = fn->conf;
109 struct sync_buddy *buddy;
110 struct sync_filter_context *ctx;
114 ctx = fn->private_data = para_calloc(sizeof(*ctx));
115 INIT_LIST_HEAD(&ctx->buddies);
118 /* create socket to listen for incoming packets */
122 NULL /* no host required */,
124 NULL /* no flowopts */
127 PARA_ERROR_LOG("could not create UDP listening socket %d\n",
128 sfc->conf->port_arg);
131 ctx->listen_fd = ret;
132 PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
134 for (i = 0; i < sfc->conf->buddy_given; i++) {
135 struct sync_buddy_info *sbi = sfc->buddy_info + i;
136 const char *url = sfc->conf->buddy_arg[i];
139 /* make buddy udp socket from address info */
141 ret = makesock_addrinfo(
143 false /* not passive */,
145 NULL /* no flowopts */
148 PARA_WARNING_LOG("could not make socket for %s\n",
153 ret = mark_fd_nonblocking(fd);
155 PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
160 buddy = para_malloc(sizeof(*buddy));
163 buddy->ping_received = false;
164 para_list_add(&buddy->node, &ctx->buddies);
166 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
169 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
173 static int sync_parse_config(int argc, char **argv, void **result)
176 struct sync_filter_config *sfc;
177 struct sync_filter_args_info *conf = para_malloc(sizeof(*conf));
179 sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */
180 sfc = para_calloc(sizeof(*sfc));
182 n = conf->buddy_given;
183 sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info));
184 PARA_INFO_LOG("initializing buddy info array of length %d\n", n);
185 for (i = 0; i < n; i++) {
186 const char *url = conf->buddy_arg[i];
187 size_t len = strlen(url);
188 char *host = para_malloc(len + 1);
191 struct sync_buddy_info *sbi = sfc->buddy_info + i;
193 if (!parse_url(url, host, len, &port)) {
195 PARA_ERROR_LOG("could not parse url %s\n", url);
196 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
200 port = conf->port_arg;
201 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
204 PARA_ERROR_LOG("host lookup failure for %s\n", url);
212 sbi->disabled = false;
213 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
219 PARA_ERROR_LOG("%s\n", para_strerror(-ret));
220 sync_free_config(sfc);
225 * True if we sent a packet to all budies and received a packet from each
228 static bool sync_complete(struct sync_filter_context *ctx)
230 struct sync_buddy *buddy;
234 FOR_EACH_BUDDY(buddy, &ctx->buddies)
235 if (!buddy->sbi->disabled && !buddy->ping_received)
240 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
242 struct sync_buddy *buddy;
244 FOR_EACH_BUDDY(buddy, &ctx->buddies) {
245 if (buddy->sbi->disabled)
247 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
248 buddy->sbi->disabled = true;
252 static void sync_set_timeout(struct sync_filter_context *ctx,
253 struct sync_filter_config *sfc)
257 ms2tv(sfc->conf->timeout_arg, &to);
258 tv_add(now, &to, &ctx->timeout);
261 static void sync_pre_select(struct sched *s, void *context)
264 struct filter_node *fn = context;
265 struct sync_filter_context *ctx = fn->private_data;
266 struct sync_filter_config *sfc = fn->conf;
268 if (list_empty(&ctx->buddies))
269 return sched_min_delay(s);
270 if (ctx->listen_fd < 0)
271 return sched_min_delay(s);
272 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
274 return sched_min_delay(s);
275 para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
278 if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
279 sync_set_timeout(ctx, sfc);
280 return sched_min_delay(s);
282 if (sync_complete(ctx)) /* push down what we have */
283 return sched_min_delay(s);
284 sched_request_barrier_or_min_delay(&ctx->timeout, s);
287 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
288 struct list_head *list)
290 struct sync_buddy *buddy;
292 FOR_EACH_BUDDY(buddy, list)
293 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
298 static int sync_post_select(__a_unused struct sched *s, void *context)
301 struct filter_node *fn = context;
302 struct sync_filter_context *ctx = fn->private_data;
303 struct sync_filter_config *sfc = fn->conf;
304 struct sync_buddy *buddy, *tmp;
306 if (list_empty(&ctx->buddies))
308 ret = -E_SYNC_LISTEN_FD;
309 if (ctx->listen_fd < 0)
311 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
316 if (ctx->timeout.tv_sec == 0)
317 sync_set_timeout(ctx, sfc);
319 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
320 sync_disable_active_buddies(ctx);
324 if (!ctx->ping_sent) {
325 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
327 PARA_INFO_LOG("pinging %s (%s)\n",
328 buddy->sbi->url, buddy->sbi->disabled?
329 "disabled" : "enabled");
330 ret = xwrite(buddy->fd, &c, 1);
331 sync_close_buddy(buddy);
333 PARA_WARNING_LOG("failed to write to %s: %s\n",
334 buddy->sbi->url, para_strerror(-ret));
335 list_del(&buddy->node);
338 ctx->ping_sent = true;
340 if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
343 struct sockaddr src_addr;
344 socklen_t len = sizeof(src_addr);
345 ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
348 if (errno == EAGAIN || errno == EWOULDBLOCK)
350 ret = -ERRNO_TO_PARA_ERROR(errno);
353 buddy = sync_find_buddy(&src_addr, &ctx->buddies);
355 PARA_NOTICE_LOG("pinged by unknown\n");
358 PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
359 if (buddy->sbi->disabled) {
360 PARA_NOTICE_LOG("enabling %s\n",
362 buddy->sbi->disabled = false;
364 list_del(&buddy->node);
367 if (!sync_complete(ctx))
370 * Although all enabled buddies are in sync we do not splice out
371 * ourselves immediately. We rather wait until the timout expires,
372 * or the buddy list has become empty. This opens a time window
373 * for disabled buddies to become enabled by sending us a packet.
375 btr_pushdown(fn->btrn);
378 ret = -E_SYNC_COMPLETE; /* success */
381 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
383 sync_close_buddies(ctx);
384 btr_splice_out_node(&fn->btrn);
390 * The synchronization filter.
392 * \param f Pointer to the struct to initialize.
394 void sync_filter_init(struct filter *f)
396 struct sync_filter_args_info dummy;
398 sync_filter_cmdline_parser_init(&dummy);
400 f->close = sync_close;
401 f->pre_select = sync_pre_select;
402 f->post_select = sync_post_select;
403 f->parse_config = sync_parse_config;
404 f->free_config = sync_free_config;
405 f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter);