2 * Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>
4 * Licensed under the GPL v2. For licencing details see COPYING.
7 /** \file sync_filter.c Playback synchronization filter. */
9 #include <netinet/in.h>
10 #include <sys/socket.h>
12 #include <sys/types.h>
13 #include <arpa/inet.h>
18 #include "sync_filter.cmdline.h"
23 #include "buffer_tree.h"
29 struct sync_buddy_info
{
37 /* One active buddy */
40 struct sync_buddy_info
*sbi
;
42 struct list_head node
;
45 /* Allocated in ->open() */
46 struct sync_filter_context
{
48 struct list_head buddies
;
49 struct timeval timeout
;
53 /* Allocated and freed in ->parse_config() and ->free_config(). */
54 struct sync_filter_config
{
55 struct sync_filter_args_info
*conf
;
56 struct sync_buddy_info
*buddy_info
;
59 #define FOR_EACH_BUDDY(_buddy, _list) \
60 list_for_each_entry(_buddy, _list, node)
61 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
62 list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
64 static void sync_close_buddy(struct sync_buddy
*buddy
)
66 PARA_DEBUG_LOG("closing %s, fd %d\n", buddy
->sbi
->url
, buddy
->fd
);
68 list_del(&buddy
->node
);
72 static void sync_close_buddies(struct sync_filter_context
*ctx
)
74 struct sync_buddy
*buddy
, *tmp
;
76 FOR_EACH_BUDDY_SAFE(buddy
, tmp
, &ctx
->buddies
)
77 sync_close_buddy(buddy
);
80 static void sync_close(struct filter_node
*fn
)
82 struct sync_filter_context
*ctx
= fn
->private_data
;
84 sync_close_buddies(ctx
);
85 if (ctx
->listen_fd
>= 0) {
86 close(ctx
->listen_fd
);
90 fn
->private_data
= NULL
;
93 static void sync_free_config(void *conf
)
95 struct sync_filter_config
*sfc
= conf
;
98 for (i
= 0; i
< sfc
->conf
->buddy_given
; i
++) {
99 free(sfc
->buddy_info
[i
].host
);
100 freeaddrinfo(sfc
->buddy_info
[i
].ai
);
102 sync_filter_cmdline_parser_free(sfc
->conf
);
106 static void sync_open(struct filter_node
*fn
)
109 struct sync_filter_config
*sfc
= fn
->conf
;
110 struct sync_buddy
*buddy
;
111 struct sync_filter_context
*ctx
;
115 ctx
= fn
->private_data
= para_calloc(sizeof(*ctx
));
116 INIT_LIST_HEAD(&ctx
->buddies
);
119 /* create socket to listen for incoming packets */
123 NULL
/* no host required */,
125 NULL
/* no flowopts */
128 PARA_ERROR_LOG("could not create UDP listening socket %d\n",
129 sfc
->conf
->port_arg
);
132 ctx
->listen_fd
= ret
;
133 PARA_INFO_LOG("listening on fd %d\n", ctx
->listen_fd
);
135 for (i
= 0; i
< sfc
->conf
->buddy_given
; i
++) {
136 struct sync_buddy_info
*sbi
= sfc
->buddy_info
+ i
;
137 const char *url
= sfc
->conf
->buddy_arg
[i
];
140 /* make buddy udp socket from address info */
142 ret
= makesock_addrinfo(
144 false /* not passive */,
146 NULL
/* no flowopts */
149 PARA_WARNING_LOG("could not make socket for %s\n",
154 ret
= mark_fd_nonblocking(fd
);
156 PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
161 buddy
= para_malloc(sizeof(*buddy
));
164 buddy
->ping_received
= false;
165 para_list_add(&buddy
->node
, &ctx
->buddies
);
167 PARA_INFO_LOG("opened buddy %s on fd %d\n", url
, fd
);
170 PARA_WARNING_LOG("%s\n", para_strerror(-ret
));
175 * At parse config time, we build an array of struct sync_buddy_info with one
176 * entry for each buddy given in the arguments. This array is not affected by
177 * sync_close(), so information stored there can be used for multiple instances
178 * (para_audiod). We store the resolved url and the ->disabled bit in this
181 static int sync_parse_config(int argc
, char **argv
, void **result
)
184 struct sync_filter_config
*sfc
;
185 struct sync_filter_args_info
*conf
= para_malloc(sizeof(*conf
));
187 sync_filter_cmdline_parser(argc
, argv
, conf
); /* exits on error */
188 sfc
= para_calloc(sizeof(*sfc
));
190 n
= conf
->buddy_given
;
191 sfc
->buddy_info
= para_malloc((n
+ 1) * sizeof(*sfc
->buddy_info
));
192 PARA_INFO_LOG("initializing buddy info array of length %d\n", n
);
193 for (i
= 0; i
< n
; i
++) {
194 const char *url
= conf
->buddy_arg
[i
];
195 size_t len
= strlen(url
);
196 char *host
= para_malloc(len
+ 1);
199 struct sync_buddy_info
*sbi
= sfc
->buddy_info
+ i
;
201 if (!parse_url(url
, host
, len
, &port
)) {
203 PARA_ERROR_LOG("could not parse url %s\n", url
);
204 ret
= -ERRNO_TO_PARA_ERROR(EINVAL
);
208 port
= conf
->port_arg
;
209 ret
= lookup_address(IPPROTO_UDP
, false /* not passive */,
212 PARA_ERROR_LOG("host lookup failure for %s\n", url
);
220 sbi
->disabled
= false;
221 PARA_DEBUG_LOG("buddy #%d: %s\n", i
, url
);
227 PARA_ERROR_LOG("%s\n", para_strerror(-ret
));
228 sync_free_config(sfc
);
233 * True if we sent a packet to all buddies and received a packet from each
236 static bool sync_complete(struct sync_filter_context
*ctx
)
238 struct sync_buddy
*buddy
;
242 FOR_EACH_BUDDY(buddy
, &ctx
->buddies
)
243 if (!buddy
->sbi
->disabled
&& !buddy
->ping_received
)
248 static void sync_disable_active_buddies(struct sync_filter_context
*ctx
)
250 struct sync_buddy
*buddy
;
252 FOR_EACH_BUDDY(buddy
, &ctx
->buddies
) {
253 if (buddy
->sbi
->disabled
)
255 if (buddy
->ping_received
== true)
257 PARA_NOTICE_LOG("disabling %s\n", buddy
->sbi
->url
);
258 buddy
->sbi
->disabled
= true;
262 static void sync_set_timeout(struct sync_filter_context
*ctx
,
263 struct sync_filter_config
*sfc
)
267 ms2tv(sfc
->conf
->timeout_arg
, &to
);
268 tv_add(now
, &to
, &ctx
->timeout
);
271 static void sync_pre_select(struct sched
*s
, void *context
)
274 struct filter_node
*fn
= context
;
275 struct sync_filter_context
*ctx
= fn
->private_data
;
276 struct sync_filter_config
*sfc
= fn
->conf
;
278 if (list_empty(&ctx
->buddies
))
279 return sched_min_delay(s
);
280 if (ctx
->listen_fd
< 0)
281 return sched_min_delay(s
);
282 ret
= btr_node_status(fn
->btrn
, 0, BTR_NT_INTERNAL
);
284 return sched_min_delay(s
);
285 para_fd_set(ctx
->listen_fd
, &s
->rfds
, &s
->max_fileno
);
288 if (ctx
->timeout
.tv_sec
== 0) { /* must ping buddies */
289 sync_set_timeout(ctx
, sfc
);
290 return sched_min_delay(s
);
292 if (sync_complete(ctx
)) /* push down what we have */
293 return sched_min_delay(s
);
294 sched_request_barrier_or_min_delay(&ctx
->timeout
, s
);
297 static struct sync_buddy
*sync_find_buddy(struct sockaddr
*addr
,
298 struct list_head
*list
)
300 struct sync_buddy
*buddy
;
302 FOR_EACH_BUDDY(buddy
, list
)
303 if (sockaddr_equal(buddy
->sbi
->ai
->ai_addr
, addr
))
308 static int sync_post_select(__a_unused
struct sched
*s
, void *context
)
311 struct filter_node
*fn
= context
;
312 struct sync_filter_context
*ctx
= fn
->private_data
;
313 struct sync_filter_config
*sfc
= fn
->conf
;
314 struct sync_buddy
*buddy
, *tmp
;
316 if (list_empty(&ctx
->buddies
))
318 ret
= -E_SYNC_LISTEN_FD
;
319 if (ctx
->listen_fd
< 0)
321 ret
= btr_node_status(fn
->btrn
, 0, BTR_NT_INTERNAL
);
326 if (ctx
->timeout
.tv_sec
== 0)
327 sync_set_timeout(ctx
, sfc
);
329 if (tv_diff(&ctx
->timeout
, now
, NULL
) < 0) {
330 sync_disable_active_buddies(ctx
);
334 if (!ctx
->ping_sent
) {
335 FOR_EACH_BUDDY_SAFE(buddy
, tmp
, &ctx
->buddies
) {
337 PARA_INFO_LOG("pinging %s (%s)\n",
338 buddy
->sbi
->url
, buddy
->sbi
->disabled
?
339 "disabled" : "enabled");
340 ret
= xwrite(buddy
->fd
, &c
, 1);
342 PARA_WARNING_LOG("failed to write to %s: %s\n",
343 buddy
->sbi
->url
, para_strerror(-ret
));
344 sync_close_buddy(buddy
);
347 ctx
->ping_sent
= true;
349 if (FD_ISSET(ctx
->listen_fd
, &s
->rfds
)) {
352 struct sockaddr src_addr
;
353 socklen_t len
= sizeof(src_addr
);
354 ret
= recvfrom(ctx
->listen_fd
, &c
, 1, MSG_DONTWAIT
,
357 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
359 ret
= -ERRNO_TO_PARA_ERROR(errno
);
362 buddy
= sync_find_buddy(&src_addr
, &ctx
->buddies
);
364 PARA_NOTICE_LOG("pinged by unknown\n");
367 PARA_DEBUG_LOG("pinged by %s\n", buddy
->sbi
->url
);
368 if (buddy
->sbi
->disabled
) {
369 PARA_NOTICE_LOG("enabling %s\n",
371 buddy
->sbi
->disabled
= false;
373 buddy
->ping_received
= true;
376 if (!sync_complete(ctx
))
379 * Although all enabled buddies are in sync we do not splice out
380 * ourselves immediately. We rather wait until the timeout expires,
381 * or the buddy list has become empty. This opens a time window
382 * for disabled buddies to become enabled by sending us a packet.
384 btr_pushdown(fn
->btrn
);
387 ret
= -E_SYNC_COMPLETE
; /* success */
390 PARA_WARNING_LOG("%s\n", para_strerror(-ret
));
392 sync_close_buddies(ctx
);
393 btr_splice_out_node(&fn
->btrn
);
399 * The synchronization filter.
401 * \param f Pointer to the struct to initialize.
403 void sync_filter_init(struct filter
*f
)
405 struct sync_filter_args_info dummy
;
407 sync_filter_cmdline_parser_init(&dummy
);
409 f
->close
= sync_close
;
410 f
->pre_select
= sync_pre_select
;
411 f
->post_select
= sync_post_select
;
412 f
->parse_config
= sync_parse_config
;
413 f
->free_config
= sync_free_config
;
414 f
->help
= (struct ggo_help
)DEFINE_GGO_HELP(sync_filter
);