Merge branch 't/sync'
authorAndre Noll <maan@systemlinux.org>
Sat, 22 Feb 2014 16:29:10 +0000 (17:29 +0100)
committerAndre Noll <maan@systemlinux.org>
Sat, 22 Feb 2014 16:31:00 +0000 (17:31 +0100)
Cooking since 2014-01-14.

* t/sync:
  The sync filter.
  net: Let maksock() callers perform flowopt cleanup.
  net: makesock_addrinfo(): Make socketfd local to the loop.
  net: Let makesock() continue on setsockopt() failure.
  net: Reduce indentation level in makesock_addrinfo().
  net: Further simplify makesock_addrinfo().
  net: Replace the double loop of lookup_address() by a single loop.
  net: makesock(): Combine code for passive sockets.
  net: Change makesock_addrinfo() to set given flowopts before SO_REUSEADDR.
  net: Clarify code flow of makesock_addrinfo().
  net: Remove unnecessary condition in makesock_addrinfo().
  net: Remove pointless initialization in makesock_addrinfo().
  net: Kill dead code in makesock_addrinfo().
  net: Remove networking headers from para.h.
  net: Improve error handling of makesock_addrinfo().
  net: Split makesock(), part 2: Introduce makesock_addrinfo().
  net: Split makesock(), part 1: Introduce lookup_address().
  net: Simplify makesock().
  net: Fix parse_url().
  net: Correct \return text of parse_url().

24 files changed:
NEWS
acl.c
afs.c
audioc.c
audiod.c
audiod_command.c
client_common.c
command.c
configure.ac
dccp_recv.c
dccp_send.c
error.h
http_recv.c
http_send.c
m4/gengetopt/sync_filter.m4 [new file with mode: 0644]
net.c
net.h
para.h
send_common.c
server.c
sync_filter.c [new file with mode: 0644]
udp_recv.c
udp_send.c
vss.c

diff --git a/NEWS b/NEWS
index c3cf479..cc9b67a 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -1,10 +1,12 @@
 NEWS
 ====
 
----------------------------------
-0.5.2 (???) "orthogonal interior"
----------------------------------
+---------------------------------------------
+0.5.2 (to be announced) "orthogonal interior"
+---------------------------------------------
 
+       - The new sync synchronizes playback between multiple clients.
+       - Major cleanup of the networking subsystem.
        - Minor fixes to avoid clang warnings.
 
 ------------------------------------------
diff --git a/acl.c b/acl.c
index e70ab9b..7762a99 100644 (file)
--- a/acl.c
+++ b/acl.c
@@ -6,7 +6,12 @@
 
 /** \file acl.c Access control lists for paraslash senders. */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
diff --git a/afs.c b/afs.c
index 2f9df84..135888a 100644 (file)
--- a/afs.c
+++ b/afs.c
@@ -6,10 +6,15 @@
 
 /** \file afs.c Paraslash's audio file selector. */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <signal.h>
 #include <fnmatch.h>
 #include <osl.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "server.cmdline.h"
 #include "para.h"
index 0edab36..79051a9 100644 (file)
--- a/audioc.c
+++ b/audioc.c
@@ -6,8 +6,13 @@
 
 /** \file audioc.c The client program used to connect to para_audiod. */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 #include <stdbool.h>
 #include <signal.h>
 
index 5ef5778..f12dbc1 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -5,8 +5,14 @@
  */
 
 /** \file audiod.c The paraslash's audio daemon. */
+
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 #include <signal.h>
 
 #include "para.h"
index 07b2c81..1ffab87 100644 (file)
@@ -6,8 +6,13 @@
 
 /** \file audiod_command.c Commands for para_audiod. */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "audiod.cmdline.h"
index c19b712..6652cc3 100644 (file)
@@ -6,8 +6,13 @@
 
 /** \file client_common.c Common functions of para_client and para_audiod. */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
index 8c74665..fc098b7 100644 (file)
--- a/command.c
+++ b/command.c
@@ -6,10 +6,15 @@
 
 /** \file command.c Client authentication and server commands. */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <signal.h>
 #include <sys/types.h>
 #include <osl.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
index 36eab11..cdbc0e5 100644 (file)
@@ -924,6 +924,7 @@ if test "$have_openssl" = "yes" -o "$have_gcrypt" = "yes"; then
                amp_filter
                udp_recv
                prebuffer_filter
+               sync_filter
        "
        audiod_errlist_objs="$audiod_errlist_objs
                audiod
@@ -961,6 +962,7 @@ if test "$have_openssl" = "yes" -o "$have_gcrypt" = "yes"; then
                wma_common
                wmadec_filter
                buffer_tree
+               sync_filter
        "
        if test "$have_openssl" = "yes"; then
                audiod_errlist_objs="$audiod_errlist_objs crypt"
@@ -1105,6 +1107,7 @@ filters="
        fecdec
        wmadec
        prebuffer
+       sync
 "
 filter_errlist_objs="
        filter_common
@@ -1129,12 +1132,14 @@ filter_errlist_objs="
        wmadec_filter
        buffer_tree
        net
+       sync_filter
 "
 filter_cmdline_objs="
        filter
        compress_filter
        amp_filter
        prebuffer_filter
+       sync_filter
 "
 
 if test "$have_vorbis" = "yes"; then
@@ -1311,6 +1316,7 @@ play_errlist_objs="
        write_common
        file_write
        version
+       sync_filter
 "
 play_cmdline_objs="
        http_recv
@@ -1322,6 +1328,7 @@ play_cmdline_objs="
        prebuffer_filter
        file_write
        play
+       sync_filter
 "
 if test "$have_core_audio" = "yes"; then
        play_errlist_objs="$play_errlist_objs osx_write ipc"
index c751f2f..ca3432a 100644 (file)
  * (C) 2005 Ian McDonald <imcdnzl@gmail.com>
  */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
@@ -53,6 +58,7 @@ static int dccp_recv_open(struct receiver_node *rn)
        }
 
        fd = makesock(IPPROTO_DCCP, 0, conf->host_arg, conf->port_arg, fo);
+       flowopt_cleanup(fo);
        free(ccids);
        if (fd < 0)
                return fd;
index 3979982..22f2bd1 100644 (file)
  * (C) 2005 Ian McDonald <imcdnzl@gmail.com>
  */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <sys/types.h>
 #include <osl.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
diff --git a/error.h b/error.h
index 82df5ca..08b309c 100644 (file)
--- a/error.h
+++ b/error.h
@@ -37,8 +37,13 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define VERSION_ERRORS
 #define SCHED_ERRORS
 
+
 extern const char **para_errlist[];
 
+#define SYNC_FILTER_ERRORS\
+       PARA_ERROR(SYNC_COMPLETE, "all buddies in sync"), \
+       PARA_ERROR(SYNC_LISTEN_FD, "no fd to listen on"), \
+
 #define OSS_MIX_ERRORS \
        PARA_ERROR(OSS_MIXER_CHANNEL, "invalid mixer channel"), \
 
index 7db8ba1..9c42a1a 100644 (file)
@@ -7,7 +7,12 @@
 /** \file http_recv.c paraslash's http receiver */
 
 #include <regex.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
index 52383b0..60b877c 100644 (file)
@@ -6,9 +6,14 @@
 
 /** \file http_send.c paraslash's http sender */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <sys/types.h>
 #include <osl.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
diff --git a/m4/gengetopt/sync_filter.m4 b/m4/gengetopt/sync_filter.m4
new file mode 100644 (file)
index 0000000..1e6f5f8
--- /dev/null
@@ -0,0 +1,45 @@
+args "--no-version --no-help"
+
+purpose "Synchronize playback between multiple clients."
+
+option "buddy" b
+#~~~~~~~~~~~~~~~
+"host to synchronize with"
+multiple
+string typestr = "url"
+optional
+details = "
+       This option may be given multiple times, one per buddy. Each
+       value may be given as a host, port pair in either IPv4 or
+       IPv6 form, with port being optional. If no port was specified
+       the listening port (as specified with --port, see below)
+       is used to send the synchronization packet to this buddy.
+"
+
+option "port" p
+#~~~~~~~~~~~~~~
+"UDP port for incoming synchronization packets"
+int typestr = "portnumber"
+default = "29900"
+optional
+details = "
+       The sync filter receives incoming synchronization packets on
+       this UDP port.
+"
+
+option "timeout" t
+#~~~~~~~~~~~~~~~~~
+"how long to wait for other clients"
+int typestr = "milliseconds"
+default = "2000"
+optional
+details = "
+       Once the sync filter receives its first chunk of input, a
+       synchronization period of the given number of milliseconds
+       begins. Playback is deferred until a synchronization packet
+       has been received from each defined buddy, or until the end
+       of the period. Buddies which did not send a synchronization
+       packet in time are temporarily disabled and are not waited for
+       during subsequent synchronization periods. They are re-enabled
+       automatically when another synchronization packet arrives.
+"
diff --git a/net.c b/net.c
index 4a31f01..986660f 100644 (file)
--- a/net.c
+++ b/net.c
  */
 #define _GNU_SOURCE
 
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <sys/types.h>
+#include <sys/socket.h>
 #include <netdb.h>
 
 /* At least NetBSD needs these. */
@@ -142,9 +147,9 @@ static bool host_string_ok(const char *host)
  * \param hostlen The maximum length of \a host.
  * \param port   To return the port number (if any) of \a url.
  *
- * \return Pointer to \a host, or NULL if failed.
- * If NULL is returned, \a host and \a portnum are undefined. If no
- * port number was present in \a url, \a portnum is set to -1.
+ * \return Pointer to \a host, or \p NULL if failed.  If \p NULL is returned,
+ * \a host and \a port are undefined. If no port number was present in \a url,
+ * \a port is set to -1.
  *
  * \sa RFC 3986, 3.2.2/3.2.3
  */
@@ -168,16 +173,16 @@ char *parse_url(const char *url,
                if (*o++ != ']' || (*o != '\0' && *o != ':'))
                        goto failed;
        } else {
-               for (; (*c = *o == ':'? '\0' : *o); c++, o++)
-                       if (c == end)
+               for (; (*c = *o == ':'? '\0' : *o); c++, o++) {
+                       if (c == end && o[1])
                                goto failed;
+               }
        }
 
        if (*o == ':')
                if (para_atoi32(++o, port) < 0 ||
                    *port < 0 || *port > 0xffff)
                        goto failed;
-
        if (host_string_ok(host))
                return host;
 failed:
@@ -328,7 +333,14 @@ static void flowopt_setopts(int sockfd, struct flowopts *fo)
                }
 }
 
-static void flowopt_cleanup(struct flowopts *fo)
+/**
+ * Deallocate all resources of a flowopts structure.
+ *
+ * \param fo A pointer as returned from flowopt_new().
+ *
+ * It's OK to pass \p NULL here in which case the function does nothing.
+ */
+void flowopt_cleanup(struct flowopts *fo)
 {
        struct pre_conn_opt *cur, *next;
 
@@ -344,137 +356,148 @@ static void flowopt_cleanup(struct flowopts *fo)
 }
 
 /**
- * Resolve IPv4/IPv6 address and create a ready-to-use active or passive socket.
+ * Resolve an IPv4/IPv6 address.
  *
  * \param l4type The layer-4 type (\p IPPROTO_xxx).
- * \param passive Whether this is a passive (1) or active (0) socket.
+ * \param passive Whether \p AI_PASSIVE should be included as hint.
  * \param host Remote or local hostname or IPv/6 address string.
- * \param port_number Decimal port number.
- * \param fo Socket options to be set before making the connection.
+ * \param port_number Used to set the port in each returned address structure.
+ * \param result addrinfo structures are returned here.
  *
- * This creates a ready-made IPv4/v6 socket structure after looking up the
- * necessary parameters. The interpretation of \a host depends on the value of
- * \a passive:
- *     - on a passive socket host is interpreted as an interface IPv4/6 address
- *       (can be left NULL);
- *     - on an active socket, \a host is the peer DNS name or IPv4/6 address
- *       to connect to;
- *     - \a port_number is in either case the numeric port number (not service
- *       string).
- *
- * Furthermore, bind(2) is called on passive sockets, and connect(2) on active
- * sockets. The algorithm tries all possible address combinations until it
- * succeeds. If \a fo is supplied, options are set and cleanup is performed.
- *
- * \return This function returns 1 on success and \a -E_ADDRESS_LOOKUP when no
- * matching connection could be set up (with details in the error log).
- *
- *  \sa ipv6(7), getaddrinfo(3), bind(2), connect(2).
- */
-int makesock(unsigned l4type, bool passive,
-            const char *host, uint16_t port_number,
-            struct flowopts *fo)
-{
-       struct addrinfo *local = NULL, *src = NULL, *remote = NULL,
-               *dst = NULL, hints;
-       unsigned int    l3type = AF_UNSPEC;
-       int             rc, on = 1, sockfd = -1,
-                       socktype = sock_type(l4type);
+ * The interpretation of \a host depends on the value of \a passive. On a
+ * passive socket host is interpreted as an interface IPv4/6 address (can be
+ * left NULL). On an active socket, \a host is the peer DNS name or IPv4/6
+ * address to connect to.
+ *
+ * \return Standard.
+ *
+ * \sa getaddrinfo(3).
+ */
+int lookup_address(unsigned l4type, bool passive, const char *host,
+               int port_number, struct addrinfo **result)
+{
+       int ret;
        char port[6]; /* port number has at most 5 digits */
+       struct addrinfo *addr = NULL, hints;
 
-       sprintf(port, "%u", port_number);
+       *result = NULL;
+       sprintf(port, "%u", port_number & 0xffff);
        /* Set up address hint structure */
        memset(&hints, 0, sizeof(hints));
-       hints.ai_family = l3type;
-       hints.ai_socktype = socktype;
-       /* 
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = sock_type(l4type);
+       /*
         * getaddrinfo does not support SOCK_DCCP, so for the sake of lookup
         * (and only then) pretend to be UDP.
         */
        if (l4type == IPPROTO_DCCP)
                hints.ai_socktype = SOCK_DGRAM;
-
        /* only use addresses available on the host */
        hints.ai_flags = AI_ADDRCONFIG;
-       if (l3type == AF_INET6)
-               /* use v4-mapped-v6 if no v6 addresses found */
-               hints.ai_flags |= AI_V4MAPPED | AI_ALL;
-
        if (passive && host == NULL)
                hints.ai_flags |= AI_PASSIVE;
-
        /* Obtain local/remote address information */
-       if ((rc = getaddrinfo(host, port, &hints, passive ? &local : &remote))) {
-               PARA_ERROR_LOG("can not resolve %s address %s#%s: %s.\n",
-                               layer4_name(l4type),
-                               host? host : (passive? "[loopback]" : "[localhost]"),
-                               port, gai_strerror(rc));
-               rc = -E_ADDRESS_LOOKUP;
-               goto out;
+       ret = getaddrinfo(host, port, &hints, &addr);
+       if (ret != 0) {
+               PARA_ERROR_LOG("can not resolve %s address %s#%s: %s\n",
+                       layer4_name(l4type),
+                       host? host : (passive? "[loopback]" : "[localhost]"),
+                       port, gai_strerror(ret));
+               return -E_ADDRESS_LOOKUP;
        }
+       *result = addr;
+       return 1;
+}
 
-       /* Iterate over all src/dst combination, exhausting dst first */
-       for (src = local, dst = remote; src != NULL || dst != NULL; /* no op */ ) {
-               if (src && dst && src->ai_family == AF_INET
-                               && dst->ai_family == AF_INET6)
-                       goto get_next_dst; /* v4 -> v6 is not possible */
-
-               sockfd = socket(src ? src->ai_family : dst->ai_family,
-                       socktype, l4type);
-               if (sockfd < 0)
-                       goto get_next_dst;
+/**
+ * Create an active or passive socket.
+ *
+ * \param l4type \p IPPROTO_TCP, \p IPPROTO_UDP, or \p IPPROTO_DCCP.
+ * \param passive Whether to call bind(2) or connect(2).
+ * \param ai Address information as obtained from \ref lookup_address().
+ * \param fo Socket options to be set before making the connection.
+ *
+ * bind(2) is called on passive sockets, and connect(2) on active sockets. The
+ * algorithm tries all possible address combinations until it succeeds. If \a
+ * fo is supplied, options are set but cleanup must be performed in the caller.
+ *
+ * \return File descriptor on success, \p E_MAKESOCK on errors.
+ *
+ * \sa \ref lookup_address(), \ref makesock(), ip(7), ipv6(7), bind(2),
+ * connect(2).
+ */
+int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai,
+               struct flowopts *fo)
+{
+       int ret = -E_MAKESOCK, on = 1;
 
+       for (; ai; ai = ai->ai_next) {
+               int fd;
+               ret = socket(ai->ai_family, sock_type(l4type), l4type);
+               if (ret < 0)
+                       continue;
+               fd = ret;
+               flowopt_setopts(fd, fo);
+               if (!passive) {
+                       if (connect(fd, ai->ai_addr, ai->ai_addrlen) == 0)
+                               return fd;
+                       close(fd);
+                       continue;
+               }
                /*
                 * Reuse the address on passive sockets to avoid failure on
                 * restart (protocols using listen()) and when creating
                 * multiple listener instances (UDP multicast).
                 */
-               if (passive && setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
-                                                 &on, sizeof(on)) == -1) {
-                       rc = errno;
-                       close(sockfd);
-                       PARA_ERROR_LOG("can not set SO_REUSEADDR: %s\n",
-                                      strerror(rc));
-                       rc = -ERRNO_TO_PARA_ERROR(rc);
-                       break;
-               }
-               flowopt_setopts(sockfd, fo);
-
-               if (src) {
-                       if (bind(sockfd, src->ai_addr, src->ai_addrlen) < 0) {
-                               close(sockfd);
-                               goto get_next_src;
-                       }
-                       if (!dst) /* bind-only completed successfully */
-                               break;
+               if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on,
+                               sizeof(on)) == -1) {
+                       close(fd);
+                       continue;
                }
-
-               if (dst && connect(sockfd, dst->ai_addr, dst->ai_addrlen) == 0)
-                       break; /* connection completed successfully */
-               close(sockfd);
-get_next_dst:
-               if (dst && (dst = dst->ai_next))
+               if (bind(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
+                       close(fd);
                        continue;
-get_next_src:
-               if (src && (src = src->ai_next)) /* restart inner loop */
-                       dst = remote;
+               }
+               return fd;
        }
-out:
-       if (local)
-               freeaddrinfo(local);
-       if (remote)
-               freeaddrinfo(remote);
-       flowopt_cleanup(fo);
-
-       if (src == NULL && dst == NULL) {
-               if (rc >= 0)
-                       rc = -E_MAKESOCK;
-               PARA_ERROR_LOG("can not create %s socket %s#%s.\n",
-                       layer4_name(l4type), host? host : (passive?
-                       "[loopback]" : "[localhost]"), port);
-               return rc;
+       return -E_MAKESOCK;
+}
+
+/**
+ * Resolve IPv4/IPv6 address and create a ready-to-use active or passive socket.
+ *
+ * \param l4type The layer-4 type (\p IPPROTO_xxx).
+ * \param passive Whether this is a passive or active socket.
+ * \param host Passed to \ref lookup_address().
+ * \param port_number Passed to \ref lookup_address().
+ * \param fo Passed to \ref makesock_addrinfo().
+ *
+ * This creates a ready-made IPv4/v6 socket structure after looking up the
+ * necessary parameters. The function first calls \ref lookup_address() and
+ * passes the address information to makesock_addrinfo() to create and
+ * initialize the socket.
+ *
+ * \return The newly created file descriptor on success, a negative error code
+ * on failure.
+ *
+ * \sa \ref lookup_address(), \ref makesock_addrinfo().
+ */
+int makesock(unsigned l4type, bool passive, const char *host, uint16_t port_number,
+               struct flowopts *fo)
+{
+       struct addrinfo *ai;
+       int ret = lookup_address(l4type, passive, host, port_number, &ai);
+
+       if (ret >= 0)
+               ret = makesock_addrinfo(l4type, passive, ai, fo);
+       if (ai)
+               freeaddrinfo(ai);
+       if (ret < 0) {
+               PARA_ERROR_LOG("can not create %s socket %s#%d.\n",
+               layer4_name(l4type), host? host : (passive?
+               "[loopback]" : "[localhost]"), port_number);
        }
-       return sockfd;
+       return ret;
 }
 
 /**
@@ -687,6 +710,32 @@ void extract_v4_addr(const struct sockaddr_storage *ss, struct in_addr *ia)
                *ia = ((struct sockaddr_in *)sa)->sin_addr;
 }
 
+/**
+ * Compare the address part of IPv4/6 addresses.
+ *
+ * \param sa1 First address.
+ * \param sa2 Second address.
+ *
+ * \return True iff the IP address of \a sa1 and \a sa2 match.
+ */
+bool sockaddr_equal(const struct sockaddr *sa1, const struct sockaddr *sa2)
+{
+       if (!sa1 || !sa2)
+               return false;
+       if (sa1->sa_family != sa2->sa_family)
+               return false;
+       if (sa1->sa_family == AF_INET) {
+               struct sockaddr_in *a1 = (typeof(a1))sa1,
+                       *a2 = (typeof (a2))sa2;
+               return a1->sin_addr.s_addr == a2->sin_addr.s_addr;
+       } else if (sa1->sa_family == AF_INET6) {
+               struct sockaddr_in6 *a1 = (typeof(a1))sa1,
+                       *a2 = (typeof (a2))sa2;
+               return !memcmp(a1, a2, sizeof(*a1));
+       } else
+               return false;
+}
+
 /**
  * Receive data from a file descriptor.
  *
diff --git a/net.h b/net.h
index 0003fa9..e249498 100644 (file)
--- a/net.h
+++ b/net.h
@@ -61,6 +61,7 @@ struct flowopts;
 extern struct flowopts *flowopt_new(void);
 extern void flowopt_add(struct flowopts *fo, int level, int opt,
                const char *name, const void *val, int len);
+void flowopt_cleanup(struct flowopts *fo);
 /** Flowopt shortcut macros */
 #define OPT_ADD(fo, lev, opt, val, len)        flowopt_add(fo, lev, opt, #opt, val, len)
 
@@ -101,12 +102,17 @@ _static_inline_ bool is_valid_ipv6_address(const char *address)
        return inet_pton(AF_INET6, address, &test_it) != 0;
 }
 
+int lookup_address(unsigned l4type, bool passive, const char *host,
+               int port_number, struct addrinfo **result);
+
 /**
  * Generic socket creation (passive and active sockets).
  */
-extern int makesock(unsigned l4type, bool passive,
-                   const char *host, uint16_t port_number,
-                   struct flowopts *fo);
+int makesock(unsigned l4type, bool passive, const char *host,
+               uint16_t port_number, struct flowopts *fo);
+
+int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai,
+               struct flowopts *fo);
 
 static inline int para_connect_simple(unsigned l4type,
                                      const char *host, uint16_t port)
@@ -115,6 +121,7 @@ static inline int para_connect_simple(unsigned l4type,
 }
 
 void extract_v4_addr(const struct sockaddr_storage *ss, struct in_addr *ia);
+bool sockaddr_equal(const struct sockaddr *sa1, const struct sockaddr *sa2);
 
 /**
  * Functions to support listening sockets.
diff --git a/para.h b/para.h
index 29658f4..d1e266f 100644 (file)
--- a/para.h
+++ b/para.h
 #include <limits.h>
 #include <stdarg.h>
 #include <ctype.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <sys/socket.h>
-#include <sys/un.h> /* needed by create_pf_socket */
 #include <string.h>
 #include <assert.h>
 #include <stdbool.h>
+#include <inttypes.h>
+#include <sys/uio.h>
 #include "gcc-compat.h"
 
 /** used in various contexts */
index 250a2a0..a16869b 100644 (file)
@@ -6,8 +6,13 @@
 
 /** \file send_common.c Functions used by more than one paraslash sender. */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <osl.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
index 70d9137..f92ef55 100644 (file)
--- a/server.c
+++ b/server.c
  *     - Forward error correction: \ref fec.c.
  */
 
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <signal.h>
 #include <regex.h>
 #include <osl.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
diff --git a/sync_filter.c b/sync_filter.c
new file mode 100644 (file)
index 0000000..379b54b
--- /dev/null
@@ -0,0 +1,406 @@
+/*
+ * Copyright (C) 2013 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file sync_filter.c Playback synchronization filter. */
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <regex.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
+
+#include "para.h"
+#include "sync_filter.cmdline.h"
+#include "list.h"
+#include "net.h"
+#include "sched.h"
+#include "ggo.h"
+#include "buffer_tree.h"
+#include "filter.h"
+#include "string.h"
+#include "fd.h"
+#include "error.h"
+
+struct sync_buddy_info {
+       const char *url;
+       char *host;
+       int port;
+       struct addrinfo *ai;
+       bool disabled;
+};
+
+/* per open/close data */
+struct sync_buddy {
+       int fd;
+       struct sync_buddy_info *sbi;
+       bool ping_received;
+       struct list_head node;
+};
+
+struct sync_filter_context {
+       int listen_fd;
+       struct list_head buddies;
+       struct timeval timeout;
+       bool ping_sent;
+};
+
+struct sync_filter_config {
+       struct sync_filter_args_info *conf;
+       struct sync_buddy_info *buddy_info;
+};
+
+#define FOR_EACH_BUDDY(_buddy, _list) \
+       list_for_each_entry(_buddy, _list, node)
+#define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
+       list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node)
+
+static void sync_close_buddy(struct sync_buddy *buddy)
+{
+       if (buddy->fd < 0)
+               return;
+       PARA_DEBUG_LOG("closing %s\n", buddy->sbi->url);
+       close(buddy->fd);
+       buddy->fd = -1;
+}
+
+static void sync_close_buddies(struct sync_filter_context *ctx)
+{
+       struct sync_buddy *buddy;
+
+       FOR_EACH_BUDDY(buddy, &ctx->buddies)
+               sync_close_buddy(buddy);
+}
+
+static void sync_close(struct filter_node *fn)
+{
+       struct sync_filter_context *ctx = fn->private_data;
+
+       sync_close_buddies(ctx);
+       if (ctx->listen_fd >= 0) {
+               close(ctx->listen_fd);
+               ctx->listen_fd = -1;
+       }
+       free(ctx);
+       fn->private_data = NULL;
+}
+
+static void sync_free_config(void *conf)
+{
+       struct sync_filter_config *sfc = conf;
+       int i;
+
+       for (i = 0; i < sfc->conf->buddy_given; i++) {
+               free(sfc->buddy_info[i].host);
+               freeaddrinfo(sfc->buddy_info[i].ai);
+       }
+       sync_filter_cmdline_parser_free(sfc->conf);
+       free(sfc);
+}
+
+static void sync_open(struct filter_node *fn)
+{
+       int i, ret;
+       struct sync_filter_config *sfc = fn->conf;
+       struct sync_buddy *buddy;
+       struct sync_filter_context *ctx;
+
+       assert(sfc);
+
+       ctx = fn->private_data = para_calloc(sizeof(*ctx));
+       INIT_LIST_HEAD(&ctx->buddies);
+       ctx->listen_fd = -1;
+
+       /* create socket to listen for incoming packets */
+       ret = makesock(
+               IPPROTO_UDP,
+               true /* passive */,
+               NULL /* no host required */,
+               sfc->conf->port_arg,
+               NULL /* no flowopts */
+       );
+       if (ret < 0) {
+               PARA_ERROR_LOG("could not create UDP listening socket %d\n",
+                       sfc->conf->port_arg);
+               return;
+       }
+       ctx->listen_fd = ret;
+       PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
+
+       for (i = 0; i < sfc->conf->buddy_given; i++) {
+               struct sync_buddy_info *sbi = sfc->buddy_info + i;
+               const char *url = sfc->conf->buddy_arg[i];
+               int fd;
+
+               /* make buddy udp socket from address info */
+               assert(sbi->ai);
+               ret = makesock_addrinfo(
+                       IPPROTO_UDP,
+                       false /* not passive */,
+                       sbi->ai,
+                       NULL /* no flowopts */
+               );
+               if (ret < 0) {
+                       PARA_WARNING_LOG("could not make socket for %s\n",
+                               url);
+                       goto fail;
+               }
+               fd = ret;
+               ret = mark_fd_nonblocking(fd);
+               if (ret < 0) {
+                       PARA_ERROR_LOG("unable to set nonblock mode for %s\n",
+                               url);
+                       close(fd);
+                       goto fail;
+               }
+               buddy = para_malloc(sizeof(*buddy));
+               buddy->fd = fd;
+               buddy->sbi = sbi;
+               buddy->ping_received = false;
+               para_list_add(&buddy->node, &ctx->buddies);
+
+               PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd);
+               continue;
+fail:
+               PARA_WARNING_LOG("%s\n", para_strerror(-ret));
+       }
+}
+
+static int sync_parse_config(int argc, char **argv, void **result)
+{
+       int i, ret, n;
+       struct sync_filter_config *sfc;
+       struct sync_filter_args_info *conf = para_malloc(sizeof(*conf));
+
+       sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */
+       sfc = para_calloc(sizeof(*sfc));
+       sfc->conf = conf;
+       n = conf->buddy_given;
+       sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info));
+       PARA_INFO_LOG("initializing buddy info array of length %d\n", n);
+       for (i = 0; i < n; i++) {
+               const char *url = conf->buddy_arg[i];
+               size_t len = strlen(url);
+               char *host = para_malloc(len + 1);
+               int port;
+               struct addrinfo *ai;
+               struct sync_buddy_info *sbi = sfc->buddy_info + i;
+
+               if (!parse_url(url, host, len, &port)) {
+                       free(host);
+                       PARA_ERROR_LOG("could not parse url %s\n", url);
+                       ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+                       goto fail;
+               }
+               if (port < 0)
+                       port = conf->port_arg;
+               ret = lookup_address(IPPROTO_UDP, false /* not passive */,
+                       host, port, &ai);
+               if (ret < 0) {
+                       PARA_ERROR_LOG("host lookup failure for %s\n", url);
+                       free(host);
+                       goto fail;
+               }
+               sbi->url = url;
+               sbi->host = host;
+               sbi->port = port;
+               sbi->ai = ai;
+               sbi->disabled = false;
+               PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
+       }
+       *result = sfc;
+       return 1;
+fail:
+       assert(ret < 0);
+       PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+       sync_free_config(sfc);
+       return ret;
+}
+
+/*
+ * True if we sent a packet to all budies and received a packet from each
+ * enabled buddy.
+ */
+static bool sync_complete(struct sync_filter_context *ctx)
+{
+       struct sync_buddy *buddy;
+
+       if (!ctx->ping_sent)
+               return false;
+       FOR_EACH_BUDDY(buddy, &ctx->buddies)
+               if (!buddy->sbi->disabled && !buddy->ping_received)
+                       return false;
+       return true;
+}
+
+static void sync_disable_active_buddies(struct sync_filter_context *ctx)
+{
+       struct sync_buddy *buddy;
+
+       FOR_EACH_BUDDY(buddy, &ctx->buddies) {
+               if (buddy->sbi->disabled)
+                       continue;
+               PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
+               buddy->sbi->disabled = true;
+       }
+}
+
+static void sync_set_timeout(struct sync_filter_context *ctx,
+               struct sync_filter_config *sfc)
+{
+       struct timeval to;
+
+       ms2tv(sfc->conf->timeout_arg, &to);
+       tv_add(now, &to, &ctx->timeout);
+}
+
+static void sync_pre_select(struct sched *s, struct task *t)
+{
+       int ret;
+       struct filter_node *fn = container_of(t, struct filter_node, task);
+       struct sync_filter_context *ctx = fn->private_data;
+       struct sync_filter_config *sfc = fn->conf;
+
+       if (list_empty(&ctx->buddies))
+               return sched_min_delay(s);
+       if (ctx->listen_fd < 0)
+               return sched_min_delay(s);
+       ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
+       if (ret < 0)
+               return sched_min_delay(s);
+       para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
+       if (ret == 0)
+               return;
+       if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
+               sync_set_timeout(ctx, sfc);
+               return sched_min_delay(s);
+       }
+       if (sync_complete(ctx)) /* push down what we have */
+               return sched_min_delay(s);
+       sched_request_barrier_or_min_delay(&ctx->timeout, s);
+}
+
+static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
+               struct list_head *list)
+{
+       struct sync_buddy *buddy;
+
+       FOR_EACH_BUDDY(buddy, list)
+               if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr))
+                       return buddy;
+       return NULL;
+}
+
+static int sync_post_select(__a_unused struct sched *s, struct task *t)
+{
+       int ret;
+       struct filter_node *fn = container_of(t, struct filter_node, task);
+       struct sync_filter_context *ctx = fn->private_data;
+       struct sync_filter_config *sfc = fn->conf;
+       struct sync_buddy *buddy, *tmp;
+
+       if (list_empty(&ctx->buddies))
+               goto success;
+       ret = -E_SYNC_LISTEN_FD;
+       if (ctx->listen_fd < 0)
+               goto fail;
+       ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
+       if (ret < 0)
+               goto fail;
+       if (ret == 0)
+               return 0;
+       if (ctx->timeout.tv_sec == 0)
+               sync_set_timeout(ctx, sfc);
+       else {
+               if (tv_diff(&ctx->timeout, now, NULL) < 0) {
+                       sync_disable_active_buddies(ctx);
+                       goto success;
+               }
+       }
+       if (!ctx->ping_sent) {
+               FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) {
+                       char c = '\0';
+                       PARA_INFO_LOG("pinging %s (%s)\n",
+                               buddy->sbi->url, buddy->sbi->disabled?
+                               "disabled" : "enabled");
+                       ret = xwrite(buddy->fd, &c, 1);
+                       sync_close_buddy(buddy);
+                       if (ret < 0) {
+                               PARA_WARNING_LOG("failed to write to %s: %s\n",
+                                       buddy->sbi->url, para_strerror(-ret));
+                               list_del(&buddy->node);
+                       }
+               }
+               ctx->ping_sent = true;
+       }
+       if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
+               char c;
+               for (;;) {
+                       struct sockaddr src_addr;
+                       socklen_t len = sizeof(src_addr);
+                       ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT,
+                               &src_addr, &len);
+                       if (ret < 0) {
+                               if (errno == EAGAIN || errno == EWOULDBLOCK)
+                                       break;
+                               ret = -ERRNO_TO_PARA_ERROR(errno);
+                               goto fail;
+                       }
+                       buddy = sync_find_buddy(&src_addr, &ctx->buddies);
+                       if (!buddy) {
+                               PARA_NOTICE_LOG("pinged by unknown\n");
+                               continue;
+                       }
+                       PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url);
+                       if (buddy->sbi->disabled) {
+                               PARA_NOTICE_LOG("enabling %s\n",
+                                       buddy->sbi->url);
+                               buddy->sbi->disabled = false;
+                       }
+                       list_del(&buddy->node);
+               }
+       }
+       if (!sync_complete(ctx))
+               return 1;
+       /*
+        * Although all enabled buddies are in sync we do not splice out
+        * ourselves immediately. We rather wait until the timout expires,
+        * or the buddy list has become empty. This opens a time window
+        * for disabled buddies to become enabled by sending us a packet.
+        */
+       btr_pushdown(fn->btrn);
+       return 1;
+success:
+       ret = -E_SYNC_COMPLETE; /* success */
+       goto out;
+fail:
+       PARA_WARNING_LOG("%s\n", para_strerror(-ret));
+out:
+       sync_close_buddies(ctx);
+       btr_splice_out_node(&fn->btrn);
+       assert(ret < 0);
+       return ret;
+}
+
+/**
+ * The synchronization filter.
+ *
+ * \param f Pointer to the struct to initialize.
+ */
+void sync_filter_init(struct filter *f)
+{
+       struct sync_filter_args_info dummy;
+
+       sync_filter_cmdline_parser_init(&dummy);
+       f->open = sync_open;
+       f->close = sync_close;
+       f->pre_select = sync_pre_select;
+       f->post_select = sync_post_select;
+       f->parse_config = sync_parse_config;
+       f->free_config = sync_free_config;
+       f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter);
+}
index 1648cad..96b6c73 100644 (file)
@@ -5,9 +5,14 @@
  */
 /** \file udp_recv.c Paraslash's udp receiver */
 
+#include <netinet/in.h>
 #include <regex.h>
 #include <sys/socket.h>
 #include <net/if.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
index 7930f09..4580f00 100644 (file)
@@ -6,13 +6,17 @@
 
 /** \file udp_send.c Para_server's udp sender. */
 
-
+#include <netinet/in.h>
+#include <sys/socket.h>
 #include <regex.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/udp.h>
 #include <net/if.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
 #include <osl.h>
+#include <netdb.h>
 
 #include "server.cmdline.h"
 #include "para.h"
diff --git a/vss.c b/vss.c
index 06707d6..2349b09 100644 (file)
--- a/vss.c
+++ b/vss.c
  * senders.
  */
 
+#include <sys/socket.h>
+#include <netinet/in.h>
 #include <regex.h>
 #include <osl.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"