2 * Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>
4 * Licensed under the GPL v2. For licencing details see COPYING.
7 /** \file sync_filter.c Playback synchronization filter. */
9 #include <netinet/in.h>
10 #include <sys/socket.h>
12 #include <sys/types.h>
13 #include <arpa/inet.h>
18 #include "filter_cmd.lsg.h"
23 #include "buffer_tree.h"
29 struct sync_buddy_info {
37 /* One active buddy */
40 struct sync_buddy_info *sbi;
42 struct list_head node;
45 /* Allocated in ->open(), stored in fn->private_data */
46 struct sync_filter_context {
48 struct list_head buddies;
49 struct timeval timeout;
53 #define FOR_EACH_BUDDY(_buddy, _list) \
54 list_for_each_entry(_buddy, _list, node)
55 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
56 list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
58 static void sync_close_buddy(struct sync_buddy *buddy)
60 PARA_DEBUG_LOG("closing %s, fd %d\n", buddy->sbi->url, buddy->fd);
62 list_del(&buddy->node);
66 static void sync_close_buddies(struct sync_filter_context *ctx)
68 struct sync_buddy *buddy, *tmp;
70 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies)
71 sync_close_buddy(buddy);
74 static void sync_close(struct filter_node *fn)
76 struct sync_filter_context *ctx = fn->private_data;
78 sync_close_buddies(ctx);
79 if (ctx->listen_fd >= 0) {
80 close(ctx->listen_fd);
84 fn->private_data = NULL;
87 static void sync_teardown(const struct lls_parse_result *lpr, void *conf)
89 struct sync_buddy_info *sbi = conf;
90 int i, num_buddies = FILTER_CMD_OPT_GIVEN(SYNC, BUDDY, lpr);
92 for (i = 0; i < num_buddies; i++) {
94 freeaddrinfo(sbi[i].ai);
99 static void sync_open(struct filter_node *fn)
102 struct sync_buddy *buddy;
103 struct sync_filter_context *ctx;
104 struct sync_buddy_info *sbi = fn->conf;
105 uint32_t port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, fn->lpr);
106 unsigned buddy_given;
107 const struct lls_opt_result *r_b;
109 ctx = fn->private_data = para_calloc(sizeof(*ctx));
110 INIT_LIST_HEAD(&ctx->buddies);
112 /* create socket to listen for incoming packets */
116 NULL /* no host required */,
118 NULL /* no flowopts */
121 PARA_ERROR_LOG("could not create UDP listening socket %u\n",
125 ctx->listen_fd = ret;
126 PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
128 r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, fn->lpr);
129 buddy_given = lls_opt_given(r_b);
130 for (i = 0; i < buddy_given; i++) {
132 const char *url = lls_string_val(i, r_b);
134 /* make buddy udp socket from address info */
136 ret = makesock_addrinfo(
138 false /* not passive */,
140 NULL /* no flowopts */
143 PARA_WARNING_LOG("could not make socket for %s\n",
148 ret = mark_fd_nonblocking(fd);
150 PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
155 buddy = para_malloc(sizeof(*buddy));
157 buddy->sbi = sbi + i;
158 buddy->ping_received = false;
159 para_list_add(&buddy->node, &ctx->buddies);
161 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
164 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
169 * Build an array of struct sync_buddy_info with one entry for each buddy given
170 * in the arguments. This array is not affected by sync_close(), so information
171 * stored there can be used for multiple instances (para_audiod). We store the
172 * resolved url and the ->disabled bit in this array.
174 static void *sync_setup(const struct lls_parse_result *lpr)
178 struct sync_buddy_info *sbi;
179 const struct lls_opt_result *r_b;
181 r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, lpr);
182 n = lls_opt_given(r_b);
183 sbi = para_malloc(n * sizeof(*sbi));
184 PARA_INFO_LOG("initializing buddy info array of length %u\n", n);
185 for (i = 0; i < n; i++) {
186 const char *url = lls_string_val(i, r_b);
187 size_t len = strlen(url);
188 char *host = para_malloc(len + 1);
192 if (!parse_url(url, host, len, &port)) {
193 PARA_ERROR_LOG("could not parse url %s\n", url);
197 port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, lpr);
198 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
201 PARA_ERROR_LOG("host lookup failure for %s: %s\n",
202 url, para_strerror(-ret));
209 sbi[i].disabled = false;
210 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
216 * True if we sent a packet to all buddies and received a packet from each
219 static bool sync_complete(struct sync_filter_context *ctx)
221 struct sync_buddy *buddy;
225 FOR_EACH_BUDDY(buddy, &ctx->buddies)
226 if (!buddy->sbi->disabled && !buddy->ping_received)
231 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
233 struct sync_buddy *buddy;
235 FOR_EACH_BUDDY(buddy, &ctx->buddies) {
236 if (buddy->sbi->disabled)
238 if (buddy->ping_received == true)
240 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
241 buddy->sbi->disabled = true;
245 static void sync_set_timeout(struct sync_filter_context *ctx,
246 struct lls_parse_result *lpr)
248 uint32_t ms = FILTER_CMD_OPT_UINT32_VAL(SYNC, TIMEOUT, lpr);
252 tv_add(now, &to, &ctx->timeout);
255 static void sync_pre_select(struct sched *s, void *context)
258 struct filter_node *fn = context;
259 struct sync_filter_context *ctx = fn->private_data;
261 if (list_empty(&ctx->buddies))
262 return sched_min_delay(s);
263 if (ctx->listen_fd < 0)
264 return sched_min_delay(s);
265 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
267 return sched_min_delay(s);
268 para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
271 if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
272 sync_set_timeout(ctx, fn->lpr);
273 return sched_min_delay(s);
275 if (sync_complete(ctx)) /* push down what we have */
276 return sched_min_delay(s);
277 sched_request_barrier_or_min_delay(&ctx->timeout, s);
280 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
281 struct list_head *list)
283 struct sync_buddy *buddy;
285 FOR_EACH_BUDDY(buddy, list)
286 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
291 static int sync_post_select(__a_unused struct sched *s, void *context)
294 struct filter_node *fn = context;
295 struct sync_filter_context *ctx = fn->private_data;
296 struct sync_buddy *buddy, *tmp;
298 if (list_empty(&ctx->buddies))
300 ret = -E_SYNC_LISTEN_FD;
301 if (ctx->listen_fd < 0)
303 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
308 if (ctx->timeout.tv_sec == 0)
309 sync_set_timeout(ctx, fn->lpr);
311 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
312 sync_disable_active_buddies(ctx);
316 if (!ctx->ping_sent) {
317 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
319 PARA_INFO_LOG("pinging %s (%s)\n",
320 buddy->sbi->url, buddy->sbi->disabled?
321 "disabled" : "enabled");
322 ret = xwrite(buddy->fd, &c, 1);
324 PARA_WARNING_LOG("failed to write to %s: %s\n",
325 buddy->sbi->url, para_strerror(-ret));
326 sync_close_buddy(buddy);
329 ctx->ping_sent = true;
331 if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
334 struct sockaddr src_addr;
335 socklen_t len = sizeof(src_addr);
336 ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
339 if (errno == EAGAIN || errno == EWOULDBLOCK)
341 ret = -ERRNO_TO_PARA_ERROR(errno);
344 buddy = sync_find_buddy(&src_addr, &ctx->buddies);
346 PARA_NOTICE_LOG("pinged by unknown\n");
349 PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
350 if (buddy->sbi->disabled) {
351 PARA_NOTICE_LOG("enabling %s\n",
353 buddy->sbi->disabled = false;
355 buddy->ping_received = true;
358 if (!sync_complete(ctx))
361 * Although all enabled buddies are in sync we do not splice out
362 * ourselves immediately. We rather wait until the timeout expires,
363 * or the buddy list has become empty. This opens a time window
364 * for disabled buddies to become enabled by sending us a packet.
366 btr_pushdown(fn->btrn);
369 ret = -E_SYNC_COMPLETE; /* success */
372 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
374 sync_close_buddies(ctx);
375 btr_splice_out_node(&fn->btrn);
380 const struct filter lsg_filter_cmd_com_sync_user_data = {
383 .pre_select = sync_pre_select,
384 .post_select = sync_post_select,
386 .teardown = sync_teardown