From: Andre Noll Date: Fri, 6 Sep 2013 23:28:35 +0000 (+0000) Subject: The sync filter. X-Git-Tag: v0.5.2~11^2 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=bb4e94595a1e57e66deacd4efc3512d947f16f19 The sync filter. This adds a new filter for synchronization between clients. It works by sending an UDP packet to other clients ("buddies"). To reduce latency, address resolution is only performed once on startup. Hence lookup_address() and makesock_addrinfo() of net.c are made public. This commit introduces new public function sockaddr_equal() in net.c which compares two IPv4 or IPv6 addresses. It is used in sync_find_buddy(). --- diff --git a/configure.ac b/configure.ac index 36eab114..cdbc0e56 100644 --- a/configure.ac +++ b/configure.ac @@ -924,6 +924,7 @@ if test "$have_openssl" = "yes" -o "$have_gcrypt" = "yes"; then amp_filter udp_recv prebuffer_filter + sync_filter " audiod_errlist_objs="$audiod_errlist_objs audiod @@ -961,6 +962,7 @@ if test "$have_openssl" = "yes" -o "$have_gcrypt" = "yes"; then wma_common wmadec_filter buffer_tree + sync_filter " if test "$have_openssl" = "yes"; then audiod_errlist_objs="$audiod_errlist_objs crypt" @@ -1105,6 +1107,7 @@ filters=" fecdec wmadec prebuffer + sync " filter_errlist_objs=" filter_common @@ -1129,12 +1132,14 @@ filter_errlist_objs=" wmadec_filter buffer_tree net + sync_filter " filter_cmdline_objs=" filter compress_filter amp_filter prebuffer_filter + sync_filter " if test "$have_vorbis" = "yes"; then @@ -1311,6 +1316,7 @@ play_errlist_objs=" write_common file_write version + sync_filter " play_cmdline_objs=" http_recv @@ -1322,6 +1328,7 @@ play_cmdline_objs=" prebuffer_filter file_write play + sync_filter " if test "$have_core_audio" = "yes"; then play_errlist_objs="$play_errlist_objs osx_write ipc" diff --git a/error.h b/error.h index 82df5ca9..08b309ce 100644 --- a/error.h +++ b/error.h @@ -37,8 +37,13 @@ DEFINE_ERRLIST_OBJECT_ENUM; #define VERSION_ERRORS #define SCHED_ERRORS + extern const char **para_errlist[]; +#define SYNC_FILTER_ERRORS\ + PARA_ERROR(SYNC_COMPLETE, "all buddies in sync"), \ + PARA_ERROR(SYNC_LISTEN_FD, "no fd to listen on"), \ + #define OSS_MIX_ERRORS \ PARA_ERROR(OSS_MIXER_CHANNEL, "invalid mixer channel"), \ diff --git a/m4/gengetopt/sync_filter.m4 b/m4/gengetopt/sync_filter.m4 new file mode 100644 index 00000000..1e6f5f82 --- /dev/null +++ b/m4/gengetopt/sync_filter.m4 @@ -0,0 +1,45 @@ +args "--no-version --no-help" + +purpose "Synchronize playback between multiple clients." + +option "buddy" b +#~~~~~~~~~~~~~~~ +"host to synchronize with" +multiple +string typestr = "url" +optional +details = " + This option may be given multiple times, one per buddy. Each + value may be given as a host, port pair in either IPv4 or + IPv6 form, with port being optional. If no port was specified + the listening port (as specified with --port, see below) + is used to send the synchronization packet to this buddy. +" + +option "port" p +#~~~~~~~~~~~~~~ +"UDP port for incoming synchronization packets" +int typestr = "portnumber" +default = "29900" +optional +details = " + The sync filter receives incoming synchronization packets on + this UDP port. +" + +option "timeout" t +#~~~~~~~~~~~~~~~~~ +"how long to wait for other clients" +int typestr = "milliseconds" +default = "2000" +optional +details = " + Once the sync filter receives its first chunk of input, a + synchronization period of the given number of milliseconds + begins. Playback is deferred until a synchronization packet + has been received from each defined buddy, or until the end + of the period. Buddies which did not send a synchronization + packet in time are temporarily disabled and are not waited for + during subsequent synchronization periods. They are re-enabled + automatically when another synchronization packet arrives. +" diff --git a/net.c b/net.c index 70cf6a87..986660fa 100644 --- a/net.c +++ b/net.c @@ -355,7 +355,7 @@ void flowopt_cleanup(struct flowopts *fo) free(fo); } -/* +/** * Resolve an IPv4/IPv6 address. * * \param l4type The layer-4 type (\p IPPROTO_xxx). @@ -373,7 +373,7 @@ void flowopt_cleanup(struct flowopts *fo) * * \sa getaddrinfo(3). */ -static int lookup_address(unsigned l4type, bool passive, const char *host, +int lookup_address(unsigned l4type, bool passive, const char *host, int port_number, struct addrinfo **result) { int ret; @@ -409,7 +409,7 @@ static int lookup_address(unsigned l4type, bool passive, const char *host, return 1; } -/* +/** * Create an active or passive socket. * * \param l4type \p IPPROTO_TCP, \p IPPROTO_UDP, or \p IPPROTO_DCCP. @@ -426,7 +426,7 @@ static int lookup_address(unsigned l4type, bool passive, const char *host, * \sa \ref lookup_address(), \ref makesock(), ip(7), ipv6(7), bind(2), * connect(2). */ -static int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai, +int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai, struct flowopts *fo) { int ret = -E_MAKESOCK, on = 1; @@ -710,6 +710,32 @@ void extract_v4_addr(const struct sockaddr_storage *ss, struct in_addr *ia) *ia = ((struct sockaddr_in *)sa)->sin_addr; } +/** + * Compare the address part of IPv4/6 addresses. + * + * \param sa1 First address. + * \param sa2 Second address. + * + * \return True iff the IP address of \a sa1 and \a sa2 match. + */ +bool sockaddr_equal(const struct sockaddr *sa1, const struct sockaddr *sa2) +{ + if (!sa1 || !sa2) + return false; + if (sa1->sa_family != sa2->sa_family) + return false; + if (sa1->sa_family == AF_INET) { + struct sockaddr_in *a1 = (typeof(a1))sa1, + *a2 = (typeof (a2))sa2; + return a1->sin_addr.s_addr == a2->sin_addr.s_addr; + } else if (sa1->sa_family == AF_INET6) { + struct sockaddr_in6 *a1 = (typeof(a1))sa1, + *a2 = (typeof (a2))sa2; + return !memcmp(a1, a2, sizeof(*a1)); + } else + return false; +} + /** * Receive data from a file descriptor. * diff --git a/net.h b/net.h index 7aaddc0c..e249498e 100644 --- a/net.h +++ b/net.h @@ -102,12 +102,17 @@ _static_inline_ bool is_valid_ipv6_address(const char *address) return inet_pton(AF_INET6, address, &test_it) != 0; } +int lookup_address(unsigned l4type, bool passive, const char *host, + int port_number, struct addrinfo **result); + /** * Generic socket creation (passive and active sockets). */ -extern int makesock(unsigned l4type, bool passive, - const char *host, uint16_t port_number, - struct flowopts *fo); +int makesock(unsigned l4type, bool passive, const char *host, + uint16_t port_number, struct flowopts *fo); + +int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai, + struct flowopts *fo); static inline int para_connect_simple(unsigned l4type, const char *host, uint16_t port) @@ -116,6 +121,7 @@ static inline int para_connect_simple(unsigned l4type, } void extract_v4_addr(const struct sockaddr_storage *ss, struct in_addr *ia); +bool sockaddr_equal(const struct sockaddr *sa1, const struct sockaddr *sa2); /** * Functions to support listening sockets. diff --git a/sync_filter.c b/sync_filter.c new file mode 100644 index 00000000..379b54b1 --- /dev/null +++ b/sync_filter.c @@ -0,0 +1,406 @@ +/* + * Copyright (C) 2013 Andre Noll + * + * Licensed under the GPL v2. For licencing details see COPYING. + */ + +/** \file sync_filter.c Playback synchronization filter. */ + +#include +#include +#include +#include +#include +#include +#include + +#include "para.h" +#include "sync_filter.cmdline.h" +#include "list.h" +#include "net.h" +#include "sched.h" +#include "ggo.h" +#include "buffer_tree.h" +#include "filter.h" +#include "string.h" +#include "fd.h" +#include "error.h" + +struct sync_buddy_info { + const char *url; + char *host; + int port; + struct addrinfo *ai; + bool disabled; +}; + +/* per open/close data */ +struct sync_buddy { + int fd; + struct sync_buddy_info *sbi; + bool ping_received; + struct list_head node; +}; + +struct sync_filter_context { + int listen_fd; + struct list_head buddies; + struct timeval timeout; + bool ping_sent; +}; + +struct sync_filter_config { + struct sync_filter_args_info *conf; + struct sync_buddy_info *buddy_info; +}; + +#define FOR_EACH_BUDDY(_buddy, _list) \ + list_for_each_entry(_buddy, _list, node) +#define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \ + list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node) + +static void sync_close_buddy(struct sync_buddy *buddy) +{ + if (buddy->fd < 0) + return; + PARA_DEBUG_LOG("closing %s\n", buddy->sbi->url); + close(buddy->fd); + buddy->fd = -1; +} + +static void sync_close_buddies(struct sync_filter_context *ctx) +{ + struct sync_buddy *buddy; + + FOR_EACH_BUDDY(buddy, &ctx->buddies) + sync_close_buddy(buddy); +} + +static void sync_close(struct filter_node *fn) +{ + struct sync_filter_context *ctx = fn->private_data; + + sync_close_buddies(ctx); + if (ctx->listen_fd >= 0) { + close(ctx->listen_fd); + ctx->listen_fd = -1; + } + free(ctx); + fn->private_data = NULL; +} + +static void sync_free_config(void *conf) +{ + struct sync_filter_config *sfc = conf; + int i; + + for (i = 0; i < sfc->conf->buddy_given; i++) { + free(sfc->buddy_info[i].host); + freeaddrinfo(sfc->buddy_info[i].ai); + } + sync_filter_cmdline_parser_free(sfc->conf); + free(sfc); +} + +static void sync_open(struct filter_node *fn) +{ + int i, ret; + struct sync_filter_config *sfc = fn->conf; + struct sync_buddy *buddy; + struct sync_filter_context *ctx; + + assert(sfc); + + ctx = fn->private_data = para_calloc(sizeof(*ctx)); + INIT_LIST_HEAD(&ctx->buddies); + ctx->listen_fd = -1; + + /* create socket to listen for incoming packets */ + ret = makesock( + IPPROTO_UDP, + true /* passive */, + NULL /* no host required */, + sfc->conf->port_arg, + NULL /* no flowopts */ + ); + if (ret < 0) { + PARA_ERROR_LOG("could not create UDP listening socket %d\n", + sfc->conf->port_arg); + return; + } + ctx->listen_fd = ret; + PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd); + + for (i = 0; i < sfc->conf->buddy_given; i++) { + struct sync_buddy_info *sbi = sfc->buddy_info + i; + const char *url = sfc->conf->buddy_arg[i]; + int fd; + + /* make buddy udp socket from address info */ + assert(sbi->ai); + ret = makesock_addrinfo( + IPPROTO_UDP, + false /* not passive */, + sbi->ai, + NULL /* no flowopts */ + ); + if (ret < 0) { + PARA_WARNING_LOG("could not make socket for %s\n", + url); + goto fail; + } + fd = ret; + ret = mark_fd_nonblocking(fd); + if (ret < 0) { + PARA_ERROR_LOG("unable to set nonblock mode for %s\n", + url); + close(fd); + goto fail; + } + buddy = para_malloc(sizeof(*buddy)); + buddy->fd = fd; + buddy->sbi = sbi; + buddy->ping_received = false; + para_list_add(&buddy->node, &ctx->buddies); + + PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd); + continue; +fail: + PARA_WARNING_LOG("%s\n", para_strerror(-ret)); + } +} + +static int sync_parse_config(int argc, char **argv, void **result) +{ + int i, ret, n; + struct sync_filter_config *sfc; + struct sync_filter_args_info *conf = para_malloc(sizeof(*conf)); + + sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */ + sfc = para_calloc(sizeof(*sfc)); + sfc->conf = conf; + n = conf->buddy_given; + sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info)); + PARA_INFO_LOG("initializing buddy info array of length %d\n", n); + for (i = 0; i < n; i++) { + const char *url = conf->buddy_arg[i]; + size_t len = strlen(url); + char *host = para_malloc(len + 1); + int port; + struct addrinfo *ai; + struct sync_buddy_info *sbi = sfc->buddy_info + i; + + if (!parse_url(url, host, len, &port)) { + free(host); + PARA_ERROR_LOG("could not parse url %s\n", url); + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + goto fail; + } + if (port < 0) + port = conf->port_arg; + ret = lookup_address(IPPROTO_UDP, false /* not passive */, + host, port, &ai); + if (ret < 0) { + PARA_ERROR_LOG("host lookup failure for %s\n", url); + free(host); + goto fail; + } + sbi->url = url; + sbi->host = host; + sbi->port = port; + sbi->ai = ai; + sbi->disabled = false; + PARA_DEBUG_LOG("buddy #%d: %s\n", i, url); + } + *result = sfc; + return 1; +fail: + assert(ret < 0); + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + sync_free_config(sfc); + return ret; +} + +/* + * True if we sent a packet to all budies and received a packet from each + * enabled buddy. + */ +static bool sync_complete(struct sync_filter_context *ctx) +{ + struct sync_buddy *buddy; + + if (!ctx->ping_sent) + return false; + FOR_EACH_BUDDY(buddy, &ctx->buddies) + if (!buddy->sbi->disabled && !buddy->ping_received) + return false; + return true; +} + +static void sync_disable_active_buddies(struct sync_filter_context *ctx) +{ + struct sync_buddy *buddy; + + FOR_EACH_BUDDY(buddy, &ctx->buddies) { + if (buddy->sbi->disabled) + continue; + PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url); + buddy->sbi->disabled = true; + } +} + +static void sync_set_timeout(struct sync_filter_context *ctx, + struct sync_filter_config *sfc) +{ + struct timeval to; + + ms2tv(sfc->conf->timeout_arg, &to); + tv_add(now, &to, &ctx->timeout); +} + +static void sync_pre_select(struct sched *s, struct task *t) +{ + int ret; + struct filter_node *fn = container_of(t, struct filter_node, task); + struct sync_filter_context *ctx = fn->private_data; + struct sync_filter_config *sfc = fn->conf; + + if (list_empty(&ctx->buddies)) + return sched_min_delay(s); + if (ctx->listen_fd < 0) + return sched_min_delay(s); + ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL); + if (ret < 0) + return sched_min_delay(s); + para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno); + if (ret == 0) + return; + if (ctx->timeout.tv_sec == 0) { /* must ping buddies */ + sync_set_timeout(ctx, sfc); + return sched_min_delay(s); + } + if (sync_complete(ctx)) /* push down what we have */ + return sched_min_delay(s); + sched_request_barrier_or_min_delay(&ctx->timeout, s); +} + +static struct sync_buddy *sync_find_buddy(struct sockaddr *addr, + struct list_head *list) +{ + struct sync_buddy *buddy; + + FOR_EACH_BUDDY(buddy, list) + if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr)) + return buddy; + return NULL; +} + +static int sync_post_select(__a_unused struct sched *s, struct task *t) +{ + int ret; + struct filter_node *fn = container_of(t, struct filter_node, task); + struct sync_filter_context *ctx = fn->private_data; + struct sync_filter_config *sfc = fn->conf; + struct sync_buddy *buddy, *tmp; + + if (list_empty(&ctx->buddies)) + goto success; + ret = -E_SYNC_LISTEN_FD; + if (ctx->listen_fd < 0) + goto fail; + ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL); + if (ret < 0) + goto fail; + if (ret == 0) + return 0; + if (ctx->timeout.tv_sec == 0) + sync_set_timeout(ctx, sfc); + else { + if (tv_diff(&ctx->timeout, now, NULL) < 0) { + sync_disable_active_buddies(ctx); + goto success; + } + } + if (!ctx->ping_sent) { + FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) { + char c = '\0'; + PARA_INFO_LOG("pinging %s (%s)\n", + buddy->sbi->url, buddy->sbi->disabled? + "disabled" : "enabled"); + ret = xwrite(buddy->fd, &c, 1); + sync_close_buddy(buddy); + if (ret < 0) { + PARA_WARNING_LOG("failed to write to %s: %s\n", + buddy->sbi->url, para_strerror(-ret)); + list_del(&buddy->node); + } + } + ctx->ping_sent = true; + } + if (FD_ISSET(ctx->listen_fd, &s->rfds)) { + char c; + for (;;) { + struct sockaddr src_addr; + socklen_t len = sizeof(src_addr); + ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT, + &src_addr, &len); + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + ret = -ERRNO_TO_PARA_ERROR(errno); + goto fail; + } + buddy = sync_find_buddy(&src_addr, &ctx->buddies); + if (!buddy) { + PARA_NOTICE_LOG("pinged by unknown\n"); + continue; + } + PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url); + if (buddy->sbi->disabled) { + PARA_NOTICE_LOG("enabling %s\n", + buddy->sbi->url); + buddy->sbi->disabled = false; + } + list_del(&buddy->node); + } + } + if (!sync_complete(ctx)) + return 1; + /* + * Although all enabled buddies are in sync we do not splice out + * ourselves immediately. We rather wait until the timout expires, + * or the buddy list has become empty. This opens a time window + * for disabled buddies to become enabled by sending us a packet. + */ + btr_pushdown(fn->btrn); + return 1; +success: + ret = -E_SYNC_COMPLETE; /* success */ + goto out; +fail: + PARA_WARNING_LOG("%s\n", para_strerror(-ret)); +out: + sync_close_buddies(ctx); + btr_splice_out_node(&fn->btrn); + assert(ret < 0); + return ret; +} + +/** + * The synchronization filter. + * + * \param f Pointer to the struct to initialize. + */ +void sync_filter_init(struct filter *f) +{ + struct sync_filter_args_info dummy; + + sync_filter_cmdline_parser_init(&dummy); + f->open = sync_open; + f->close = sync_close; + f->pre_select = sync_pre_select; + f->post_select = sync_post_select; + f->parse_config = sync_parse_config; + f->free_config = sync_free_config; + f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter); +}