The sync filter.
authorAndre Noll <maan@systemlinux.org>
Fri, 6 Sep 2013 23:28:35 +0000 (23:28 +0000)
committerAndre Noll <maan@systemlinux.org>
Wed, 1 Jan 2014 17:53:04 +0000 (17:53 +0000)
This adds a new filter for synchronization between clients. It works
by sending an UDP packet to other clients ("buddies").

To reduce latency, address resolution is only performed once on
startup.  Hence lookup_address() and makesock_addrinfo() of net.c
are made public.

This commit introduces new public function sockaddr_equal() in
net.c which compares two IPv4 or IPv6 addresses. It is used in
sync_find_buddy().

configure.ac
error.h
m4/gengetopt/sync_filter.m4 [new file with mode: 0644]
net.c
net.h
sync_filter.c [new file with mode: 0644]

index 36eab1142a30517110bc13eda34dd52850ab629f..cdbc0e56b0f51992973076a4ae602ca0a2d1e33a 100644 (file)
@@ -924,6 +924,7 @@ if test "$have_openssl" = "yes" -o "$have_gcrypt" = "yes"; then
                amp_filter
                udp_recv
                prebuffer_filter
+               sync_filter
        "
        audiod_errlist_objs="$audiod_errlist_objs
                audiod
@@ -961,6 +962,7 @@ if test "$have_openssl" = "yes" -o "$have_gcrypt" = "yes"; then
                wma_common
                wmadec_filter
                buffer_tree
+               sync_filter
        "
        if test "$have_openssl" = "yes"; then
                audiod_errlist_objs="$audiod_errlist_objs crypt"
@@ -1105,6 +1107,7 @@ filters="
        fecdec
        wmadec
        prebuffer
+       sync
 "
 filter_errlist_objs="
        filter_common
@@ -1129,12 +1132,14 @@ filter_errlist_objs="
        wmadec_filter
        buffer_tree
        net
+       sync_filter
 "
 filter_cmdline_objs="
        filter
        compress_filter
        amp_filter
        prebuffer_filter
+       sync_filter
 "
 
 if test "$have_vorbis" = "yes"; then
@@ -1311,6 +1316,7 @@ play_errlist_objs="
        write_common
        file_write
        version
+       sync_filter
 "
 play_cmdline_objs="
        http_recv
@@ -1322,6 +1328,7 @@ play_cmdline_objs="
        prebuffer_filter
        file_write
        play
+       sync_filter
 "
 if test "$have_core_audio" = "yes"; then
        play_errlist_objs="$play_errlist_objs osx_write ipc"
diff --git a/error.h b/error.h
index 82df5ca90d5035c2f458f2b7fbf9f97f55b83334..08b309ce398d8375c334eb420d6e94408a223189 100644 (file)
--- a/error.h
+++ b/error.h
@@ -37,8 +37,13 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define VERSION_ERRORS
 #define SCHED_ERRORS
 
+
 extern const char **para_errlist[];
 
+#define SYNC_FILTER_ERRORS\
+       PARA_ERROR(SYNC_COMPLETE, "all buddies in sync"), \
+       PARA_ERROR(SYNC_LISTEN_FD, "no fd to listen on"), \
+
 #define OSS_MIX_ERRORS \
        PARA_ERROR(OSS_MIXER_CHANNEL, "invalid mixer channel"), \
 
diff --git a/m4/gengetopt/sync_filter.m4 b/m4/gengetopt/sync_filter.m4
new file mode 100644 (file)
index 0000000..1e6f5f8
--- /dev/null
@@ -0,0 +1,45 @@
+args "--no-version --no-help"
+
+purpose "Synchronize playback between multiple clients."
+
+option "buddy" b
+#~~~~~~~~~~~~~~~
+"host to synchronize with"
+multiple
+string typestr = "url"
+optional
+details = "
+       This option may be given multiple times, one per buddy. Each
+       value may be given as a host, port pair in either IPv4 or
+       IPv6 form, with port being optional. If no port was specified
+       the listening port (as specified with --port, see below)
+       is used to send the synchronization packet to this buddy.
+"
+
+option "port" p
+#~~~~~~~~~~~~~~
+"UDP port for incoming synchronization packets"
+int typestr = "portnumber"
+default = "29900"
+optional
+details = "
+       The sync filter receives incoming synchronization packets on
+       this UDP port.
+"
+
+option "timeout" t
+#~~~~~~~~~~~~~~~~~
+"how long to wait for other clients"
+int typestr = "milliseconds"
+default = "2000"
+optional
+details = "
+       Once the sync filter receives its first chunk of input, a
+       synchronization period of the given number of milliseconds
+       begins. Playback is deferred until a synchronization packet
+       has been received from each defined buddy, or until the end
+       of the period. Buddies which did not send a synchronization
+       packet in time are temporarily disabled and are not waited for
+       during subsequent synchronization periods. They are re-enabled
+       automatically when another synchronization packet arrives.
+"
diff --git a/net.c b/net.c
index 70cf6a87dfb27bbba74912bb50467a0a74f966bd..986660fa8f12fd11649d9bd4b3310b01837811dc 100644 (file)
--- a/net.c
+++ b/net.c
@@ -355,7 +355,7 @@ void flowopt_cleanup(struct flowopts *fo)
        free(fo);
 }
 
-/*
+/**
  * Resolve an IPv4/IPv6 address.
  *
  * \param l4type The layer-4 type (\p IPPROTO_xxx).
@@ -373,7 +373,7 @@ void flowopt_cleanup(struct flowopts *fo)
  *
  * \sa getaddrinfo(3).
  */
-static int lookup_address(unsigned l4type, bool passive, const char *host,
+int lookup_address(unsigned l4type, bool passive, const char *host,
                int port_number, struct addrinfo **result)
 {
        int ret;
@@ -409,7 +409,7 @@ static int lookup_address(unsigned l4type, bool passive, const char *host,
        return 1;
 }
 
-/*
+/**
  * Create an active or passive socket.
  *
  * \param l4type \p IPPROTO_TCP, \p IPPROTO_UDP, or \p IPPROTO_DCCP.
@@ -426,7 +426,7 @@ static int lookup_address(unsigned l4type, bool passive, const char *host,
  * \sa \ref lookup_address(), \ref makesock(), ip(7), ipv6(7), bind(2),
  * connect(2).
  */
-static int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai,
+int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai,
                struct flowopts *fo)
 {
        int ret = -E_MAKESOCK, on = 1;
@@ -710,6 +710,32 @@ void extract_v4_addr(const struct sockaddr_storage *ss, struct in_addr *ia)
                *ia = ((struct sockaddr_in *)sa)->sin_addr;
 }
 
+/**
+ * Compare the address part of IPv4/6 addresses.
+ *
+ * \param sa1 First address.
+ * \param sa2 Second address.
+ *
+ * \return True iff the IP address of \a sa1 and \a sa2 match.
+ */
+bool sockaddr_equal(const struct sockaddr *sa1, const struct sockaddr *sa2)
+{
+       if (!sa1 || !sa2)
+               return false;
+       if (sa1->sa_family != sa2->sa_family)
+               return false;
+       if (sa1->sa_family == AF_INET) {
+               struct sockaddr_in *a1 = (typeof(a1))sa1,
+                       *a2 = (typeof (a2))sa2;
+               return a1->sin_addr.s_addr == a2->sin_addr.s_addr;
+       } else if (sa1->sa_family == AF_INET6) {
+               struct sockaddr_in6 *a1 = (typeof(a1))sa1,
+                       *a2 = (typeof (a2))sa2;
+               return !memcmp(a1, a2, sizeof(*a1));
+       } else
+               return false;
+}
+
 /**
  * Receive data from a file descriptor.
  *
diff --git a/net.h b/net.h
index 7aaddc0c65264652c1474deb7967603612063c4a..e249498e3901b307bb68c57fe9bb4ac4d33d93a2 100644 (file)
--- a/net.h
+++ b/net.h
@@ -102,12 +102,17 @@ _static_inline_ bool is_valid_ipv6_address(const char *address)
        return inet_pton(AF_INET6, address, &test_it) != 0;
 }
 
+int lookup_address(unsigned l4type, bool passive, const char *host,
+               int port_number, struct addrinfo **result);
+
 /**
  * Generic socket creation (passive and active sockets).
  */
-extern int makesock(unsigned l4type, bool passive,
-                   const char *host, uint16_t port_number,
-                   struct flowopts *fo);
+int makesock(unsigned l4type, bool passive, const char *host,
+               uint16_t port_number, struct flowopts *fo);
+
+int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai,
+               struct flowopts *fo);
 
 static inline int para_connect_simple(unsigned l4type,
                                      const char *host, uint16_t port)
@@ -116,6 +121,7 @@ static inline int para_connect_simple(unsigned l4type,
 }
 
 void extract_v4_addr(const struct sockaddr_storage *ss, struct in_addr *ia);
+bool sockaddr_equal(const struct sockaddr *sa1, const struct sockaddr *sa2);
 
 /**
  * Functions to support listening sockets.
diff --git a/sync_filter.c b/sync_filter.c
new file mode 100644 (file)
index 0000000..379b54b
--- /dev/null
@@ -0,0 +1,406 @@
+/*
+ * Copyright (C) 2013 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file sync_filter.c Playback synchronization filter. */
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <regex.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
+
+#include "para.h"
+#include "sync_filter.cmdline.h"
+#include "list.h"
+#include "net.h"
+#include "sched.h"
+#include "ggo.h"
+#include "buffer_tree.h"
+#include "filter.h"
+#include "string.h"
+#include "fd.h"
+#include "error.h"
+
+struct sync_buddy_info {
+       const char *url;
+       char *host;
+       int port;
+       struct addrinfo *ai;
+       bool disabled;
+};
+
+/* per open/close data */
+struct sync_buddy {
+       int fd;
+       struct sync_buddy_info *sbi;
+       bool ping_received;
+       struct list_head node;
+};
+
+struct sync_filter_context {
+       int listen_fd;
+       struct list_head buddies;
+       struct timeval timeout;
+       bool ping_sent;
+};
+
+struct sync_filter_config {
+       struct sync_filter_args_info *conf;
+       struct sync_buddy_info *buddy_info;
+};
+
+#define FOR_EACH_BUDDY(_buddy, _list) \
+       list_for_each_entry(_buddy, _list, node)
+#define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
+       list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
+
+static void sync_close_buddy(struct sync_buddy *buddy)
+{
+       if (buddy->fd < 0)
+               return;
+       PARA_DEBUG_LOG("closing %s\n", buddy->sbi->url);
+       close(buddy->fd);
+       buddy->fd = -1;
+}
+
+static void sync_close_buddies(struct sync_filter_context *ctx)
+{
+       struct sync_buddy *buddy;
+
+       FOR_EACH_BUDDY(buddy, &ctx->buddies)
+               sync_close_buddy(buddy);
+}
+
+static void sync_close(struct filter_node *fn)
+{
+       struct sync_filter_context *ctx = fn->private_data;
+
+       sync_close_buddies(ctx);
+       if (ctx->listen_fd >= 0) {
+               close(ctx->listen_fd);
+               ctx->listen_fd = -1;
+       }
+       free(ctx);
+       fn->private_data = NULL;
+}
+
+static void sync_free_config(void *conf)
+{
+       struct sync_filter_config *sfc = conf;
+       int i;
+
+       for (i = 0; i < sfc->conf->buddy_given; i++) {
+               free(sfc->buddy_info[i].host);
+               freeaddrinfo(sfc->buddy_info[i].ai);
+       }
+       sync_filter_cmdline_parser_free(sfc->conf);
+       free(sfc);
+}
+
+static void sync_open(struct filter_node *fn)
+{
+       int i, ret;
+       struct sync_filter_config *sfc = fn->conf;
+       struct sync_buddy *buddy;
+       struct sync_filter_context *ctx;
+
+       assert(sfc);
+
+       ctx = fn->private_data = para_calloc(sizeof(*ctx));
+       INIT_LIST_HEAD(&ctx->buddies);
+       ctx->listen_fd = -1;
+
+       /* create socket to listen for incoming packets */
+       ret = makesock(
+               IPPROTO_UDP,
+               true /* passive */,
+               NULL /* no host required */,
+               sfc->conf->port_arg,
+               NULL /* no flowopts */
+       );
+       if (ret < 0) {
+               PARA_ERROR_LOG("could not create UDP listening socket %d\n",
+                       sfc->conf->port_arg);
+               return;
+       }
+       ctx->listen_fd = ret;
+       PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
+
+       for (i = 0; i < sfc->conf->buddy_given; i++) {
+               struct sync_buddy_info *sbi = sfc->buddy_info + i;
+               const char *url = sfc->conf->buddy_arg[i];
+               int fd;
+
+               /* make buddy udp socket from address info */
+               assert(sbi->ai);
+               ret = makesock_addrinfo(
+                       IPPROTO_UDP,
+                       false /* not passive */,
+                       sbi->ai,
+                       NULL /* no flowopts */
+               );
+               if (ret < 0) {
+                       PARA_WARNING_LOG("could not make socket for %s\n",
+                               url);
+                       goto fail;
+               }
+               fd = ret;
+               ret = mark_fd_nonblocking(fd);
+               if (ret < 0) {
+                       PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
+                               url);
+                       close(fd);
+                       goto fail;
+               }
+               buddy = para_malloc(sizeof(*buddy));
+               buddy->fd = fd;
+               buddy->sbi = sbi;
+               buddy->ping_received = false;
+               para_list_add(&buddy->node, &ctx->buddies);
+
+               PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
+               continue;
+fail:
+               PARA_WARNING_LOG("%s\n", para_strerror(-ret));
+       }
+}
+
+static int sync_parse_config(int argc, char **argv, void **result)
+{
+       int i, ret, n;
+       struct sync_filter_config *sfc;
+       struct sync_filter_args_info *conf = para_malloc(sizeof(*conf));
+
+       sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */
+       sfc = para_calloc(sizeof(*sfc));
+       sfc->conf = conf;
+       n = conf->buddy_given;
+       sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info));
+       PARA_INFO_LOG("initializing buddy info array of length %d\n", n);
+       for (i = 0; i < n; i++) {
+               const char *url = conf->buddy_arg[i];
+               size_t len = strlen(url);
+               char *host = para_malloc(len + 1);
+               int port;
+               struct addrinfo *ai;
+               struct sync_buddy_info *sbi = sfc->buddy_info + i;
+
+               if (!parse_url(url, host, len, &port)) {
+                       free(host);
+                       PARA_ERROR_LOG("could not parse url %s\n", url);
+                       ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+                       goto fail;
+               }
+               if (port < 0)
+                       port = conf->port_arg;
+               ret = lookup_address(IPPROTO_UDP, false /* not passive */,
+                       host, port, &ai);
+               if (ret < 0) {
+                       PARA_ERROR_LOG("host lookup failure for %s\n", url);
+                       free(host);
+                       goto fail;
+               }
+               sbi->url = url;
+               sbi->host = host;
+               sbi->port = port;
+               sbi->ai = ai;
+               sbi->disabled = false;
+               PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
+       }
+       *result = sfc;
+       return 1;
+fail:
+       assert(ret < 0);
+       PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+       sync_free_config(sfc);
+       return ret;
+}
+
+/*
+ * True if we sent a packet to all budies and received a packet from each
+ * enabled buddy.
+ */
+static bool sync_complete(struct sync_filter_context *ctx)
+{
+       struct sync_buddy *buddy;
+
+       if (!ctx->ping_sent)
+               return false;
+       FOR_EACH_BUDDY(buddy, &ctx->buddies)
+               if (!buddy->sbi->disabled && !buddy->ping_received)
+                       return false;
+       return true;
+}
+
+static void sync_disable_active_buddies(struct sync_filter_context *ctx)
+{
+       struct sync_buddy *buddy;
+
+       FOR_EACH_BUDDY(buddy, &ctx->buddies) {
+               if (buddy->sbi->disabled)
+                       continue;
+               PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
+               buddy->sbi->disabled = true;
+       }
+}
+
+static void sync_set_timeout(struct sync_filter_context *ctx,
+               struct sync_filter_config *sfc)
+{
+       struct timeval to;
+
+       ms2tv(sfc->conf->timeout_arg, &to);
+       tv_add(now, &to, &ctx->timeout);
+}
+
+static void sync_pre_select(struct sched *s, struct task *t)
+{
+       int ret;
+       struct filter_node *fn = container_of(t, struct filter_node, task);
+       struct sync_filter_context *ctx = fn->private_data;
+       struct sync_filter_config *sfc = fn->conf;
+
+       if (list_empty(&ctx->buddies))
+               return sched_min_delay(s);
+       if (ctx->listen_fd < 0)
+               return sched_min_delay(s);
+       ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
+       if (ret < 0)
+               return sched_min_delay(s);
+       para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
+       if (ret == 0)
+               return;
+       if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
+               sync_set_timeout(ctx, sfc);
+               return sched_min_delay(s);
+       }
+       if (sync_complete(ctx)) /* push down what we have */
+               return sched_min_delay(s);
+       sched_request_barrier_or_min_delay(&ctx->timeout, s);
+}
+
+static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
+               struct list_head *list)
+{
+       struct sync_buddy *buddy;
+
+       FOR_EACH_BUDDY(buddy, list)
+               if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
+                       return buddy;
+       return NULL;
+}
+
+static int sync_post_select(__a_unused struct sched *s, struct task *t)
+{
+       int ret;
+       struct filter_node *fn = container_of(t, struct filter_node, task);
+       struct sync_filter_context *ctx = fn->private_data;
+       struct sync_filter_config *sfc = fn->conf;
+       struct sync_buddy *buddy, *tmp;
+
+       if (list_empty(&ctx->buddies))
+               goto success;
+       ret = -E_SYNC_LISTEN_FD;
+       if (ctx->listen_fd < 0)
+               goto fail;
+       ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
+       if (ret < 0)
+               goto fail;
+       if (ret == 0)
+               return 0;
+       if (ctx->timeout.tv_sec == 0)
+               sync_set_timeout(ctx, sfc);
+       else {
+               if (tv_diff(&ctx->timeout, now, NULL) < 0) {
+                       sync_disable_active_buddies(ctx);
+                       goto success;
+               }
+       }
+       if (!ctx->ping_sent) {
+               FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
+                       char c = '\0';
+                       PARA_INFO_LOG("pinging %s (%s)\n",
+                               buddy->sbi->url, buddy->sbi->disabled?
+                               "disabled" : "enabled");
+                       ret = xwrite(buddy->fd, &c, 1);
+                       sync_close_buddy(buddy);
+                       if (ret < 0) {
+                               PARA_WARNING_LOG("failed to write to %s: %s\n",
+                                       buddy->sbi->url, para_strerror(-ret));
+                               list_del(&buddy->node);
+                       }
+               }
+               ctx->ping_sent = true;
+       }
+       if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
+               char c;
+               for (;;) {
+                       struct sockaddr src_addr;
+                       socklen_t len = sizeof(src_addr);
+                       ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
+                               &src_addr, &len);
+                       if (ret < 0) {
+                               if (errno == EAGAIN || errno == EWOULDBLOCK)
+                                       break;
+                               ret = -ERRNO_TO_PARA_ERROR(errno);
+                               goto fail;
+                       }
+                       buddy = sync_find_buddy(&src_addr, &ctx->buddies);
+                       if (!buddy) {
+                               PARA_NOTICE_LOG("pinged by unknown\n");
+                               continue;
+                       }
+                       PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
+                       if (buddy->sbi->disabled) {
+                               PARA_NOTICE_LOG("enabling %s\n",
+                                       buddy->sbi->url);
+                               buddy->sbi->disabled = false;
+                       }
+                       list_del(&buddy->node);
+               }
+       }
+       if (!sync_complete(ctx))
+               return 1;
+       /*
+        * Although all enabled buddies are in sync we do not splice out
+        * ourselves immediately. We rather wait until the timout expires,
+        * or the buddy list has become empty. This opens a time window
+        * for disabled buddies to become enabled by sending us a packet.
+        */
+       btr_pushdown(fn->btrn);
+       return 1;
+success:
+       ret = -E_SYNC_COMPLETE; /* success */
+       goto out;
+fail:
+       PARA_WARNING_LOG("%s\n", para_strerror(-ret));
+out:
+       sync_close_buddies(ctx);
+       btr_splice_out_node(&fn->btrn);
+       assert(ret < 0);
+       return ret;
+}
+
+/**
+ * The synchronization filter.
+ *
+ * \param f Pointer to the struct to initialize.
+ */
+void sync_filter_init(struct filter *f)
+{
+       struct sync_filter_args_info dummy;
+
+       sync_filter_cmdline_parser_init(&dummy);
+       f->open = sync_open;
+       f->close = sync_close;
+       f->pre_select = sync_pre_select;
+       f->post_select = sync_post_select;
+       f->parse_config = sync_parse_config;
+       f->free_config = sync_free_config;
+       f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter);
+}