1 /* Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
3 /** \file sync_filter.c Playback synchronization filter. */
5 #include <netinet/in.h>
6 #include <sys/socket.h>
14 #include "filter_cmd.lsg.h"
19 #include "buffer_tree.h"
25 struct sync_buddy_info {
33 /* One active buddy */
36 struct sync_buddy_info *sbi;
38 struct list_head node;
41 /* Allocated in ->open(), stored in fn->private_data */
42 struct sync_filter_context {
44 struct list_head buddies;
45 struct timeval timeout;
49 #define FOR_EACH_BUDDY(_buddy, _list) \
50 list_for_each_entry(_buddy, _list, node)
51 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
52 list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
54 static void sync_close_buddy(struct sync_buddy *buddy)
56 PARA_DEBUG_LOG("closing %s, fd %d\n", buddy->sbi->url, buddy->fd);
58 list_del(&buddy->node);
62 static void sync_close_buddies(struct sync_filter_context *ctx)
64 struct sync_buddy *buddy, *tmp;
66 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies)
67 sync_close_buddy(buddy);
70 static void sync_close(struct filter_node *fn)
72 struct sync_filter_context *ctx = fn->private_data;
74 sync_close_buddies(ctx);
75 if (ctx->listen_fd >= 0) {
76 close(ctx->listen_fd);
80 fn->private_data = NULL;
83 static void sync_teardown(const struct lls_parse_result *lpr, void *conf)
85 struct sync_buddy_info *sbi = conf;
86 int i, num_buddies = FILTER_CMD_OPT_GIVEN(SYNC, BUDDY, lpr);
88 for (i = 0; i < num_buddies; i++) {
90 freeaddrinfo(sbi[i].ai);
95 static void sync_open(struct filter_node *fn)
98 struct sync_buddy *buddy;
99 struct sync_filter_context *ctx;
100 struct sync_buddy_info *sbi = fn->conf;
101 uint32_t port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, fn->lpr);
102 unsigned buddy_given;
103 const struct lls_opt_result *r_b;
105 ctx = fn->private_data = zalloc(sizeof(*ctx));
106 init_list_head(&ctx->buddies);
108 /* create socket to listen for incoming packets */
112 NULL /* no host required */,
114 NULL /* no flowopts */
117 PARA_ERROR_LOG("could not create UDP listening socket %u\n",
121 ctx->listen_fd = ret;
122 PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
124 r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, fn->lpr);
125 buddy_given = lls_opt_given(r_b);
126 for (i = 0; i < buddy_given; i++) {
128 const char *url = lls_string_val(i, r_b);
130 /* make buddy udp socket from address info */
132 ret = makesock_addrinfo(
134 false /* not passive */,
136 NULL /* no flowopts */
139 PARA_WARNING_LOG("could not make socket for %s\n",
144 ret = mark_fd_nonblocking(fd);
146 PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
151 buddy = alloc(sizeof(*buddy));
153 buddy->sbi = sbi + i;
154 buddy->ping_received = false;
155 para_list_add(&buddy->node, &ctx->buddies);
157 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
160 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
165 * Build an array of struct sync_buddy_info with one entry for each buddy given
166 * in the arguments. This array is not affected by sync_close(), so information
167 * stored there can be used for multiple instances (para_audiod). We store the
168 * resolved url and the ->disabled bit in this array.
170 static void *sync_setup(const struct lls_parse_result *lpr)
174 struct sync_buddy_info *sbi;
175 const struct lls_opt_result *r_b;
177 r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, lpr);
178 n = lls_opt_given(r_b);
179 sbi = arr_alloc(n, sizeof(*sbi));
180 PARA_INFO_LOG("initializing buddy info array of length %u\n", n);
181 for (i = 0; i < n; i++) {
182 const char *url = lls_string_val(i, r_b);
183 size_t len = strlen(url);
184 char *host = alloc(len + 1);
188 if (!parse_url(url, host, len, &port)) {
189 PARA_ERROR_LOG("could not parse url %s\n", url);
193 port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, lpr);
194 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
197 PARA_ERROR_LOG("host lookup failure for %s: %s\n",
198 url, para_strerror(-ret));
205 sbi[i].disabled = false;
206 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
212 * True if we sent a packet to all buddies and received a packet from each
215 static bool sync_complete(struct sync_filter_context *ctx)
217 struct sync_buddy *buddy;
221 FOR_EACH_BUDDY(buddy, &ctx->buddies)
222 if (!buddy->sbi->disabled && !buddy->ping_received)
227 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
229 struct sync_buddy *buddy;
231 FOR_EACH_BUDDY(buddy, &ctx->buddies) {
232 if (buddy->sbi->disabled)
234 if (buddy->ping_received == true)
236 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
237 buddy->sbi->disabled = true;
241 static void sync_set_timeout(struct sync_filter_context *ctx,
242 struct lls_parse_result *lpr)
244 uint32_t ms = FILTER_CMD_OPT_UINT32_VAL(SYNC, TIMEOUT, lpr);
248 tv_add(now, &to, &ctx->timeout);
251 static void sync_pre_monitor(struct sched *s, void *context)
254 struct filter_node *fn = context;
255 struct sync_filter_context *ctx = fn->private_data;
257 if (list_empty(&ctx->buddies))
258 return sched_min_delay(s);
259 if (ctx->listen_fd < 0)
260 return sched_min_delay(s);
261 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
263 return sched_min_delay(s);
264 sched_monitor_readfd(ctx->listen_fd, s);
267 if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
268 sync_set_timeout(ctx, fn->lpr);
269 return sched_min_delay(s);
271 if (sync_complete(ctx)) /* push down what we have */
272 return sched_min_delay(s);
273 sched_request_barrier_or_min_delay(&ctx->timeout, s);
276 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
277 struct list_head *list)
279 struct sync_buddy *buddy;
281 FOR_EACH_BUDDY(buddy, list)
282 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
287 static int sync_post_monitor(__a_unused struct sched *s, void *context)
290 struct filter_node *fn = context;
291 struct sync_filter_context *ctx = fn->private_data;
292 struct sync_buddy *buddy, *tmp;
294 if (list_empty(&ctx->buddies))
296 ret = -E_SYNC_LISTEN_FD;
297 if (ctx->listen_fd < 0)
299 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
304 if (ctx->timeout.tv_sec == 0)
305 sync_set_timeout(ctx, fn->lpr);
307 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
308 sync_disable_active_buddies(ctx);
312 if (!ctx->ping_sent) {
313 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
315 PARA_INFO_LOG("pinging %s (%s)\n",
316 buddy->sbi->url, buddy->sbi->disabled?
317 "disabled" : "enabled");
318 ret = xwrite(buddy->fd, &c, 1);
320 PARA_WARNING_LOG("failed to write to %s: %s\n",
321 buddy->sbi->url, para_strerror(-ret));
322 sync_close_buddy(buddy);
325 ctx->ping_sent = true;
327 if (sched_read_ok(ctx->listen_fd, s)) {
330 struct sockaddr src_addr;
331 socklen_t len = sizeof(src_addr);
332 ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
335 if (errno == EAGAIN || errno == EWOULDBLOCK)
337 ret = -ERRNO_TO_PARA_ERROR(errno);
340 buddy = sync_find_buddy(&src_addr, &ctx->buddies);
342 PARA_NOTICE_LOG("pinged by unknown\n");
345 PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
346 if (buddy->sbi->disabled) {
347 PARA_NOTICE_LOG("enabling %s\n",
349 buddy->sbi->disabled = false;
351 buddy->ping_received = true;
354 if (!sync_complete(ctx))
357 * Although all enabled buddies are in sync we do not splice out
358 * ourselves immediately. We rather wait until the timeout expires,
359 * or the buddy list has become empty. This opens a time window
360 * for disabled buddies to become enabled by sending us a packet.
362 btr_pushdown(fn->btrn);
365 ret = -E_SYNC_COMPLETE; /* success */
369 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
371 sync_close_buddies(ctx);
372 btr_splice_out_node(&fn->btrn);
377 const struct filter lsg_filter_cmd_com_sync_user_data = {
380 .pre_monitor = sync_pre_monitor,
381 .post_monitor = sync_post_monitor,
383 .teardown = sync_teardown