mixer.c: Fix doxygen \file description.
[paraslash.git] / sync_filter.c
1 /* Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
2
3 /** \file sync_filter.c Playback synchronization filter. */
4
5 #include <netinet/in.h>
6 #include <sys/socket.h>
7 #include <regex.h>
8 #include <sys/types.h>
9 #include <arpa/inet.h>
10 #include <sys/un.h>
11 #include <netdb.h>
12 #include <lopsub.h>
13
14 #include "filter_cmd.lsg.h"
15 #include "para.h"
16 #include "list.h"
17 #include "net.h"
18 #include "sched.h"
19 #include "buffer_tree.h"
20 #include "filter.h"
21 #include "string.h"
22 #include "fd.h"
23 #include "error.h"
24
25 struct sync_buddy_info {
26 const char *url;
27 char *host;
28 int port;
29 struct addrinfo *ai;
30 bool disabled;
31 };
32
33 /* One active buddy */
34 struct sync_buddy {
35 int fd;
36 struct sync_buddy_info *sbi;
37 bool ping_received;
38 struct list_head node;
39 };
40
41 /* Allocated in ->open(), stored in fn->private_data */
42 struct sync_filter_context {
43 int listen_fd;
44 struct list_head buddies;
45 struct timeval timeout;
46 bool ping_sent;
47 };
48
49 #define FOR_EACH_BUDDY(_buddy, _list) \
50 list_for_each_entry(_buddy, _list, node)
51 #define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
52 list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
53
54 static void sync_close_buddy(struct sync_buddy *buddy)
55 {
56 PARA_DEBUG_LOG("closing %s, fd %d\n", buddy->sbi->url, buddy->fd);
57 close(buddy->fd);
58 list_del(&buddy->node);
59 free(buddy);
60 }
61
62 static void sync_close_buddies(struct sync_filter_context *ctx)
63 {
64 struct sync_buddy *buddy, *tmp;
65
66 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies)
67 sync_close_buddy(buddy);
68 }
69
70 static void sync_close(struct filter_node *fn)
71 {
72 struct sync_filter_context *ctx = fn->private_data;
73
74 sync_close_buddies(ctx);
75 if (ctx->listen_fd >= 0) {
76 close(ctx->listen_fd);
77 ctx->listen_fd = -1;
78 }
79 free(ctx);
80 fn->private_data = NULL;
81 }
82
83 static void sync_teardown(const struct lls_parse_result *lpr, void *conf)
84 {
85 struct sync_buddy_info *sbi = conf;
86 int i, num_buddies = FILTER_CMD_OPT_GIVEN(SYNC, BUDDY, lpr);
87
88 for (i = 0; i < num_buddies; i++) {
89 free(sbi[i].host);
90 freeaddrinfo(sbi[i].ai);
91 }
92 free(sbi);
93 }
94
95 static void sync_open(struct filter_node *fn)
96 {
97 int i, ret;
98 struct sync_buddy *buddy;
99 struct sync_filter_context *ctx;
100 struct sync_buddy_info *sbi = fn->conf;
101 uint32_t port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, fn->lpr);
102 unsigned buddy_given;
103 const struct lls_opt_result *r_b;
104
105 ctx = fn->private_data = para_calloc(sizeof(*ctx));
106 INIT_LIST_HEAD(&ctx->buddies);
107
108 /* create socket to listen for incoming packets */
109 ret = makesock(
110 IPPROTO_UDP,
111 true /* passive */,
112 NULL /* no host required */,
113 port,
114 NULL /* no flowopts */
115 );
116 if (ret < 0) {
117 PARA_ERROR_LOG("could not create UDP listening socket %u\n",
118 port);
119 return;
120 }
121 ctx->listen_fd = ret;
122 PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
123
124 r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, fn->lpr);
125 buddy_given = lls_opt_given(r_b);
126 for (i = 0; i < buddy_given; i++) {
127 int fd;
128 const char *url = lls_string_val(i, r_b);
129
130 /* make buddy udp socket from address info */
131 assert(sbi->ai);
132 ret = makesock_addrinfo(
133 IPPROTO_UDP,
134 false /* not passive */,
135 sbi[i].ai,
136 NULL /* no flowopts */
137 );
138 if (ret < 0) {
139 PARA_WARNING_LOG("could not make socket for %s\n",
140 url);
141 goto fail;
142 }
143 fd = ret;
144 ret = mark_fd_nonblocking(fd);
145 if (ret < 0) {
146 PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
147 url);
148 close(fd);
149 goto fail;
150 }
151 buddy = para_malloc(sizeof(*buddy));
152 buddy->fd = fd;
153 buddy->sbi = sbi + i;
154 buddy->ping_received = false;
155 para_list_add(&buddy->node, &ctx->buddies);
156
157 PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
158 continue;
159 fail:
160 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
161 }
162 }
163
164 /*
165 * Build an array of struct sync_buddy_info with one entry for each buddy given
166 * in the arguments. This array is not affected by sync_close(), so information
167 * stored there can be used for multiple instances (para_audiod). We store the
168 * resolved url and the ->disabled bit in this array.
169 */
170 static void *sync_setup(const struct lls_parse_result *lpr)
171 {
172 int i, ret;
173 unsigned n;
174 struct sync_buddy_info *sbi;
175 const struct lls_opt_result *r_b;
176
177 r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, lpr);
178 n = lls_opt_given(r_b);
179 sbi = para_malloc(n * sizeof(*sbi));
180 PARA_INFO_LOG("initializing buddy info array of length %u\n", n);
181 for (i = 0; i < n; i++) {
182 const char *url = lls_string_val(i, r_b);
183 size_t len = strlen(url);
184 char *host = para_malloc(len + 1);
185 int port;
186 struct addrinfo *ai;
187
188 if (!parse_url(url, host, len, &port)) {
189 PARA_ERROR_LOG("could not parse url %s\n", url);
190 exit(EXIT_FAILURE);
191 }
192 if (port < 0)
193 port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, lpr);
194 ret = lookup_address(IPPROTO_UDP, false /* not passive */,
195 host, port, &ai);
196 if (ret < 0) {
197 PARA_ERROR_LOG("host lookup failure for %s: %s\n",
198 url, para_strerror(-ret));
199 exit(EXIT_FAILURE);
200 }
201 sbi[i].url = url;
202 sbi[i].host = host;
203 sbi[i].port = port;
204 sbi[i].ai = ai;
205 sbi[i].disabled = false;
206 PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
207 }
208 return sbi;
209 }
210
211 /*
212 * True if we sent a packet to all buddies and received a packet from each
213 * enabled buddy.
214 */
215 static bool sync_complete(struct sync_filter_context *ctx)
216 {
217 struct sync_buddy *buddy;
218
219 if (!ctx->ping_sent)
220 return false;
221 FOR_EACH_BUDDY(buddy, &ctx->buddies)
222 if (!buddy->sbi->disabled && !buddy->ping_received)
223 return false;
224 return true;
225 }
226
227 static void sync_disable_active_buddies(struct sync_filter_context *ctx)
228 {
229 struct sync_buddy *buddy;
230
231 FOR_EACH_BUDDY(buddy, &ctx->buddies) {
232 if (buddy->sbi->disabled)
233 continue;
234 if (buddy->ping_received == true)
235 continue;
236 PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
237 buddy->sbi->disabled = true;
238 }
239 }
240
241 static void sync_set_timeout(struct sync_filter_context *ctx,
242 struct lls_parse_result *lpr)
243 {
244 uint32_t ms = FILTER_CMD_OPT_UINT32_VAL(SYNC, TIMEOUT, lpr);
245 struct timeval to;
246
247 ms2tv(ms, &to);
248 tv_add(now, &to, &ctx->timeout);
249 }
250
251 static void sync_pre_select(struct sched *s, void *context)
252 {
253 int ret;
254 struct filter_node *fn = context;
255 struct sync_filter_context *ctx = fn->private_data;
256
257 if (list_empty(&ctx->buddies))
258 return sched_min_delay(s);
259 if (ctx->listen_fd < 0)
260 return sched_min_delay(s);
261 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
262 if (ret < 0)
263 return sched_min_delay(s);
264 para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
265 if (ret == 0)
266 return;
267 if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
268 sync_set_timeout(ctx, fn->lpr);
269 return sched_min_delay(s);
270 }
271 if (sync_complete(ctx)) /* push down what we have */
272 return sched_min_delay(s);
273 sched_request_barrier_or_min_delay(&ctx->timeout, s);
274 }
275
276 static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
277 struct list_head *list)
278 {
279 struct sync_buddy *buddy;
280
281 FOR_EACH_BUDDY(buddy, list)
282 if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
283 return buddy;
284 return NULL;
285 }
286
287 static int sync_post_select(__a_unused struct sched *s, void *context)
288 {
289 int ret;
290 struct filter_node *fn = context;
291 struct sync_filter_context *ctx = fn->private_data;
292 struct sync_buddy *buddy, *tmp;
293
294 if (list_empty(&ctx->buddies))
295 goto success;
296 ret = -E_SYNC_LISTEN_FD;
297 if (ctx->listen_fd < 0)
298 goto fail;
299 ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
300 if (ret < 0)
301 goto fail;
302 if (ret == 0)
303 return 0;
304 if (ctx->timeout.tv_sec == 0)
305 sync_set_timeout(ctx, fn->lpr);
306 else {
307 if (tv_diff(&ctx->timeout, now, NULL) < 0) {
308 sync_disable_active_buddies(ctx);
309 goto success;
310 }
311 }
312 if (!ctx->ping_sent) {
313 FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
314 char c = '\0';
315 PARA_INFO_LOG("pinging %s (%s)\n",
316 buddy->sbi->url, buddy->sbi->disabled?
317 "disabled" : "enabled");
318 ret = xwrite(buddy->fd, &c, 1);
319 if (ret < 0) {
320 PARA_WARNING_LOG("failed to write to %s: %s\n",
321 buddy->sbi->url, para_strerror(-ret));
322 sync_close_buddy(buddy);
323 }
324 }
325 ctx->ping_sent = true;
326 }
327 if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
328 char c;
329 for (;;) {
330 struct sockaddr src_addr;
331 socklen_t len = sizeof(src_addr);
332 ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
333 &src_addr, &len);
334 if (ret < 0) {
335 if (errno == EAGAIN || errno == EWOULDBLOCK)
336 break;
337 ret = -ERRNO_TO_PARA_ERROR(errno);
338 goto fail;
339 }
340 buddy = sync_find_buddy(&src_addr, &ctx->buddies);
341 if (!buddy) {
342 PARA_NOTICE_LOG("pinged by unknown\n");
343 continue;
344 }
345 PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
346 if (buddy->sbi->disabled) {
347 PARA_NOTICE_LOG("enabling %s\n",
348 buddy->sbi->url);
349 buddy->sbi->disabled = false;
350 }
351 buddy->ping_received = true;
352 }
353 }
354 if (!sync_complete(ctx))
355 return 1;
356 /*
357 * Although all enabled buddies are in sync we do not splice out
358 * ourselves immediately. We rather wait until the timeout expires,
359 * or the buddy list has become empty. This opens a time window
360 * for disabled buddies to become enabled by sending us a packet.
361 */
362 btr_pushdown(fn->btrn);
363 return 1;
364 success:
365 ret = -E_SYNC_COMPLETE; /* success */
366 goto out;
367 fail:
368 PARA_WARNING_LOG("%s\n", para_strerror(-ret));
369 out:
370 sync_close_buddies(ctx);
371 btr_splice_out_node(&fn->btrn);
372 assert(ret < 0);
373 return ret;
374 }
375
376 const struct filter lsg_filter_cmd_com_sync_user_data = {
377 .setup = sync_setup,
378 .open = sync_open,
379 .pre_select = sync_pre_select,
380 .post_select = sync_post_select,
381 .close = sync_close,
382 .teardown = sync_teardown
383 };