1 /* Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
3 /** \file sync_filter.c Playback synchronization filter. */
5 #include <netinet/in.h>
6 #include <sys/socket.h>
14 #include "filter_cmd.lsg.h"
19 #include "buffer_tree.h"
25 struct sync_buddy_info
{
33 /* One active buddy */
36 struct sync_buddy_info
*sbi
;
38 struct list_head node
;
41 /* Allocated in ->open(), stored in fn->private_data */
42 struct sync_filter_context
{
44 struct list_head buddies
;
45 struct timeval timeout
;
49 #define FOR_EACH_BUDDY(_buddy, _list) \
50 list_for_each_entry(_buddy, _list, node)
51 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
52 list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
54 static void sync_close_buddy(struct sync_buddy
*buddy
)
56 PARA_DEBUG_LOG("closing %s, fd %d\n", buddy
->sbi
->url
, buddy
->fd
);
58 list_del(&buddy
->node
);
62 static void sync_close_buddies(struct sync_filter_context
*ctx
)
64 struct sync_buddy
*buddy
, *tmp
;
66 FOR_EACH_BUDDY_SAFE(buddy
, tmp
, &ctx
->buddies
)
67 sync_close_buddy(buddy
);
70 static void sync_close(struct filter_node
*fn
)
72 struct sync_filter_context
*ctx
= fn
->private_data
;
74 sync_close_buddies(ctx
);
75 if (ctx
->listen_fd
>= 0) {
76 close(ctx
->listen_fd
);
80 fn
->private_data
= NULL
;
83 static void sync_teardown(const struct lls_parse_result
*lpr
, void *conf
)
85 struct sync_buddy_info
*sbi
= conf
;
86 int i
, num_buddies
= FILTER_CMD_OPT_GIVEN(SYNC
, BUDDY
, lpr
);
88 for (i
= 0; i
< num_buddies
; i
++) {
90 freeaddrinfo(sbi
[i
].ai
);
95 static void sync_open(struct filter_node
*fn
)
98 struct sync_buddy
*buddy
;
99 struct sync_filter_context
*ctx
;
100 struct sync_buddy_info
*sbi
= fn
->conf
;
101 uint32_t port
= FILTER_CMD_OPT_UINT32_VAL(SYNC
, PORT
, fn
->lpr
);
102 unsigned buddy_given
;
103 const struct lls_opt_result
*r_b
;
105 ctx
= fn
->private_data
= para_calloc(sizeof(*ctx
));
106 INIT_LIST_HEAD(&ctx
->buddies
);
108 /* create socket to listen for incoming packets */
112 NULL
/* no host required */,
114 NULL
/* no flowopts */
117 PARA_ERROR_LOG("could not create UDP listening socket %u\n",
121 ctx
->listen_fd
= ret
;
122 PARA_INFO_LOG("listening on fd %d\n", ctx
->listen_fd
);
124 r_b
= FILTER_CMD_OPT_RESULT(SYNC
, BUDDY
, fn
->lpr
);
125 buddy_given
= lls_opt_given(r_b
);
126 for (i
= 0; i
< buddy_given
; i
++) {
128 const char *url
= lls_string_val(i
, r_b
);
130 /* make buddy udp socket from address info */
132 ret
= makesock_addrinfo(
134 false /* not passive */,
136 NULL
/* no flowopts */
139 PARA_WARNING_LOG("could not make socket for %s\n",
144 ret
= mark_fd_nonblocking(fd
);
146 PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
151 buddy
= para_malloc(sizeof(*buddy
));
153 buddy
->sbi
= sbi
+ i
;
154 buddy
->ping_received
= false;
155 para_list_add(&buddy
->node
, &ctx
->buddies
);
157 PARA_INFO_LOG("opened buddy %s on fd %d\n", url
, fd
);
160 PARA_WARNING_LOG("%s\n", para_strerror(-ret
));
165 * Build an array of struct sync_buddy_info with one entry for each buddy given
166 * in the arguments. This array is not affected by sync_close(), so information
167 * stored there can be used for multiple instances (para_audiod). We store the
168 * resolved url and the ->disabled bit in this array.
170 static void *sync_setup(const struct lls_parse_result
*lpr
)
174 struct sync_buddy_info
*sbi
;
175 const struct lls_opt_result
*r_b
;
177 r_b
= FILTER_CMD_OPT_RESULT(SYNC
, BUDDY
, lpr
);
178 n
= lls_opt_given(r_b
);
179 sbi
= para_malloc(n
* sizeof(*sbi
));
180 PARA_INFO_LOG("initializing buddy info array of length %u\n", n
);
181 for (i
= 0; i
< n
; i
++) {
182 const char *url
= lls_string_val(i
, r_b
);
183 size_t len
= strlen(url
);
184 char *host
= para_malloc(len
+ 1);
188 if (!parse_url(url
, host
, len
, &port
)) {
189 PARA_ERROR_LOG("could not parse url %s\n", url
);
193 port
= FILTER_CMD_OPT_UINT32_VAL(SYNC
, PORT
, lpr
);
194 ret
= lookup_address(IPPROTO_UDP
, false /* not passive */,
197 PARA_ERROR_LOG("host lookup failure for %s: %s\n",
198 url
, para_strerror(-ret
));
205 sbi
[i
].disabled
= false;
206 PARA_DEBUG_LOG("buddy #%d: %s\n", i
, url
);
212 * True if we sent a packet to all buddies and received a packet from each
215 static bool sync_complete(struct sync_filter_context
*ctx
)
217 struct sync_buddy
*buddy
;
221 FOR_EACH_BUDDY(buddy
, &ctx
->buddies
)
222 if (!buddy
->sbi
->disabled
&& !buddy
->ping_received
)
227 static void sync_disable_active_buddies(struct sync_filter_context
*ctx
)
229 struct sync_buddy
*buddy
;
231 FOR_EACH_BUDDY(buddy
, &ctx
->buddies
) {
232 if (buddy
->sbi
->disabled
)
234 if (buddy
->ping_received
== true)
236 PARA_NOTICE_LOG("disabling %s\n", buddy
->sbi
->url
);
237 buddy
->sbi
->disabled
= true;
241 static void sync_set_timeout(struct sync_filter_context
*ctx
,
242 struct lls_parse_result
*lpr
)
244 uint32_t ms
= FILTER_CMD_OPT_UINT32_VAL(SYNC
, TIMEOUT
, lpr
);
248 tv_add(now
, &to
, &ctx
->timeout
);
251 static void sync_pre_select(struct sched
*s
, void *context
)
254 struct filter_node
*fn
= context
;
255 struct sync_filter_context
*ctx
= fn
->private_data
;
257 if (list_empty(&ctx
->buddies
))
258 return sched_min_delay(s
);
259 if (ctx
->listen_fd
< 0)
260 return sched_min_delay(s
);
261 ret
= btr_node_status(fn
->btrn
, 0, BTR_NT_INTERNAL
);
263 return sched_min_delay(s
);
264 para_fd_set(ctx
->listen_fd
, &s
->rfds
, &s
->max_fileno
);
267 if (ctx
->timeout
.tv_sec
== 0) { /* must ping buddies */
268 sync_set_timeout(ctx
, fn
->lpr
);
269 return sched_min_delay(s
);
271 if (sync_complete(ctx
)) /* push down what we have */
272 return sched_min_delay(s
);
273 sched_request_barrier_or_min_delay(&ctx
->timeout
, s
);
276 static struct sync_buddy
*sync_find_buddy(struct sockaddr
*addr
,
277 struct list_head
*list
)
279 struct sync_buddy
*buddy
;
281 FOR_EACH_BUDDY(buddy
, list
)
282 if (sockaddr_equal(buddy
->sbi
->ai
->ai_addr
, addr
))
287 static int sync_post_select(__a_unused
struct sched
*s
, void *context
)
290 struct filter_node
*fn
= context
;
291 struct sync_filter_context
*ctx
= fn
->private_data
;
292 struct sync_buddy
*buddy
, *tmp
;
294 if (list_empty(&ctx
->buddies
))
296 ret
= -E_SYNC_LISTEN_FD
;
297 if (ctx
->listen_fd
< 0)
299 ret
= btr_node_status(fn
->btrn
, 0, BTR_NT_INTERNAL
);
304 if (ctx
->timeout
.tv_sec
== 0)
305 sync_set_timeout(ctx
, fn
->lpr
);
307 if (tv_diff(&ctx
->timeout
, now
, NULL
) < 0) {
308 sync_disable_active_buddies(ctx
);
312 if (!ctx
->ping_sent
) {
313 FOR_EACH_BUDDY_SAFE(buddy
, tmp
, &ctx
->buddies
) {
315 PARA_INFO_LOG("pinging %s (%s)\n",
316 buddy
->sbi
->url
, buddy
->sbi
->disabled
?
317 "disabled" : "enabled");
318 ret
= xwrite(buddy
->fd
, &c
, 1);
320 PARA_WARNING_LOG("failed to write to %s: %s\n",
321 buddy
->sbi
->url
, para_strerror(-ret
));
322 sync_close_buddy(buddy
);
325 ctx
->ping_sent
= true;
327 if (FD_ISSET(ctx
->listen_fd
, &s
->rfds
)) {
330 struct sockaddr src_addr
;
331 socklen_t len
= sizeof(src_addr
);
332 ret
= recvfrom(ctx
->listen_fd
, &c
, 1, MSG_DONTWAIT
,
335 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
337 ret
= -ERRNO_TO_PARA_ERROR(errno
);
340 buddy
= sync_find_buddy(&src_addr
, &ctx
->buddies
);
342 PARA_NOTICE_LOG("pinged by unknown\n");
345 PARA_DEBUG_LOG("pinged by %s\n", buddy
->sbi
->url
);
346 if (buddy
->sbi
->disabled
) {
347 PARA_NOTICE_LOG("enabling %s\n",
349 buddy
->sbi
->disabled
= false;
351 buddy
->ping_received
= true;
354 if (!sync_complete(ctx
))
357 * Although all enabled buddies are in sync we do not splice out
358 * ourselves immediately. We rather wait until the timeout expires,
359 * or the buddy list has become empty. This opens a time window
360 * for disabled buddies to become enabled by sending us a packet.
362 btr_pushdown(fn
->btrn
);
365 ret
= -E_SYNC_COMPLETE
; /* success */
368 PARA_WARNING_LOG("%s\n", para_strerror(-ret
));
370 sync_close_buddies(ctx
);
371 btr_splice_out_node(&fn
->btrn
);
376 const struct filter lsg_filter_cmd_com_sync_user_data
= {
379 .pre_select
= sync_pre_select
,
380 .post_select
= sync_post_select
,
382 .teardown
= sync_teardown