From: Andre Noll Date: Sat, 22 Feb 2014 16:29:10 +0000 (+0100) Subject: Merge branch 't/sync' X-Git-Tag: v0.5.2~11 X-Git-Url: http://git.tuebingen.mpg.de/?a=commitdiff_plain;h=d87806284b9f6be9aab71ccbf0280d273b57eeb3;hp=e6e3a404091110ac770f65dad64e194f5900cd27;p=paraslash.git Merge branch 't/sync' Cooking since 2014-01-14. * t/sync: The sync filter. net: Let maksock() callers perform flowopt cleanup. net: makesock_addrinfo(): Make socketfd local to the loop. net: Let makesock() continue on setsockopt() failure. net: Reduce indentation level in makesock_addrinfo(). net: Further simplify makesock_addrinfo(). net: Replace the double loop of lookup_address() by a single loop. net: makesock(): Combine code for passive sockets. net: Change makesock_addrinfo() to set given flowopts before SO_REUSEADDR. net: Clarify code flow of makesock_addrinfo(). net: Remove unnecessary condition in makesock_addrinfo(). net: Remove pointless initialization in makesock_addrinfo(). net: Kill dead code in makesock_addrinfo(). net: Remove networking headers from para.h. net: Improve error handling of makesock_addrinfo(). net: Split makesock(), part 2: Introduce makesock_addrinfo(). net: Split makesock(), part 1: Introduce lookup_address(). net: Simplify makesock(). net: Fix parse_url(). net: Correct \return text of parse_url(). --- diff --git a/NEWS b/NEWS index c3cf479a..cc9b67ad 100644 --- a/NEWS +++ b/NEWS @@ -1,10 +1,12 @@ NEWS ==== ---------------------------------- -0.5.2 (???) "orthogonal interior" ---------------------------------- +--------------------------------------------- +0.5.2 (to be announced) "orthogonal interior" +--------------------------------------------- + - The new sync synchronizes playback between multiple clients. + - Major cleanup of the networking subsystem. - Minor fixes to avoid clang warnings. ------------------------------------------ diff --git a/acl.c b/acl.c index e70ab9b3..7762a990 100644 --- a/acl.c +++ b/acl.c @@ -6,7 +6,12 @@ /** \file acl.c Access control lists for paraslash senders. */ +#include +#include #include +#include +#include +#include #include "para.h" #include "error.h" diff --git a/afs.c b/afs.c index 2f9df845..135888a7 100644 --- a/afs.c +++ b/afs.c @@ -6,10 +6,15 @@ /** \file afs.c Paraslash's audio file selector. */ +#include +#include #include #include #include #include +#include +#include +#include #include "server.cmdline.h" #include "para.h" diff --git a/audioc.c b/audioc.c index 0edab366..79051a98 100644 --- a/audioc.c +++ b/audioc.c @@ -6,8 +6,13 @@ /** \file audioc.c The client program used to connect to para_audiod. */ +#include +#include #include #include +#include +#include +#include #include #include diff --git a/audiod.c b/audiod.c index 5ef57781..f12dbc1b 100644 --- a/audiod.c +++ b/audiod.c @@ -5,8 +5,14 @@ */ /** \file audiod.c The paraslash's audio daemon. */ + +#include +#include #include #include +#include +#include +#include #include #include "para.h" diff --git a/audiod_command.c b/audiod_command.c index 07b2c81c..1ffab87c 100644 --- a/audiod_command.c +++ b/audiod_command.c @@ -6,8 +6,13 @@ /** \file audiod_command.c Commands for para_audiod. */ +#include +#include #include #include +#include +#include +#include #include "para.h" #include "audiod.cmdline.h" diff --git a/client_common.c b/client_common.c index c19b7121..6652cc35 100644 --- a/client_common.c +++ b/client_common.c @@ -6,8 +6,13 @@ /** \file client_common.c Common functions of para_client and para_audiod. */ +#include +#include #include #include +#include +#include +#include #include "para.h" #include "error.h" diff --git a/command.c b/command.c index 8c746652..fc098b78 100644 --- a/command.c +++ b/command.c @@ -6,10 +6,15 @@ /** \file command.c Client authentication and server commands. */ +#include +#include #include #include #include #include +#include +#include +#include #include "para.h" #include "error.h" diff --git a/configure.ac b/configure.ac index 36eab114..cdbc0e56 100644 --- a/configure.ac +++ b/configure.ac @@ -924,6 +924,7 @@ if test "$have_openssl" = "yes" -o "$have_gcrypt" = "yes"; then amp_filter udp_recv prebuffer_filter + sync_filter " audiod_errlist_objs="$audiod_errlist_objs audiod @@ -961,6 +962,7 @@ if test "$have_openssl" = "yes" -o "$have_gcrypt" = "yes"; then wma_common wmadec_filter buffer_tree + sync_filter " if test "$have_openssl" = "yes"; then audiod_errlist_objs="$audiod_errlist_objs crypt" @@ -1105,6 +1107,7 @@ filters=" fecdec wmadec prebuffer + sync " filter_errlist_objs=" filter_common @@ -1129,12 +1132,14 @@ filter_errlist_objs=" wmadec_filter buffer_tree net + sync_filter " filter_cmdline_objs=" filter compress_filter amp_filter prebuffer_filter + sync_filter " if test "$have_vorbis" = "yes"; then @@ -1311,6 +1316,7 @@ play_errlist_objs=" write_common file_write version + sync_filter " play_cmdline_objs=" http_recv @@ -1322,6 +1328,7 @@ play_cmdline_objs=" prebuffer_filter file_write play + sync_filter " if test "$have_core_audio" = "yes"; then play_errlist_objs="$play_errlist_objs osx_write ipc" diff --git a/dccp_recv.c b/dccp_recv.c index c751f2f7..ca3432a3 100644 --- a/dccp_recv.c +++ b/dccp_recv.c @@ -11,8 +11,13 @@ * (C) 2005 Ian McDonald */ +#include +#include #include #include +#include +#include +#include #include "para.h" #include "error.h" @@ -53,6 +58,7 @@ static int dccp_recv_open(struct receiver_node *rn) } fd = makesock(IPPROTO_DCCP, 0, conf->host_arg, conf->port_arg, fo); + flowopt_cleanup(fo); free(ccids); if (fd < 0) return fd; diff --git a/dccp_send.c b/dccp_send.c index 3979982c..22f2bd1b 100644 --- a/dccp_send.c +++ b/dccp_send.c @@ -11,9 +11,14 @@ * (C) 2005 Ian McDonald */ +#include +#include #include #include #include +#include +#include +#include #include "para.h" #include "error.h" diff --git a/error.h b/error.h index 82df5ca9..08b309ce 100644 --- a/error.h +++ b/error.h @@ -37,8 +37,13 @@ DEFINE_ERRLIST_OBJECT_ENUM; #define VERSION_ERRORS #define SCHED_ERRORS + extern const char **para_errlist[]; +#define SYNC_FILTER_ERRORS\ + PARA_ERROR(SYNC_COMPLETE, "all buddies in sync"), \ + PARA_ERROR(SYNC_LISTEN_FD, "no fd to listen on"), \ + #define OSS_MIX_ERRORS \ PARA_ERROR(OSS_MIXER_CHANNEL, "invalid mixer channel"), \ diff --git a/http_recv.c b/http_recv.c index 7db8ba19..9c42a1a8 100644 --- a/http_recv.c +++ b/http_recv.c @@ -7,7 +7,12 @@ /** \file http_recv.c paraslash's http receiver */ #include +#include +#include #include +#include +#include +#include #include "para.h" #include "error.h" diff --git a/http_send.c b/http_send.c index 52383b06..60b877c0 100644 --- a/http_send.c +++ b/http_send.c @@ -6,9 +6,14 @@ /** \file http_send.c paraslash's http sender */ +#include +#include #include #include #include +#include +#include +#include #include "para.h" #include "error.h" diff --git a/m4/gengetopt/sync_filter.m4 b/m4/gengetopt/sync_filter.m4 new file mode 100644 index 00000000..1e6f5f82 --- /dev/null +++ b/m4/gengetopt/sync_filter.m4 @@ -0,0 +1,45 @@ +args "--no-version --no-help" + +purpose "Synchronize playback between multiple clients." + +option "buddy" b +#~~~~~~~~~~~~~~~ +"host to synchronize with" +multiple +string typestr = "url" +optional +details = " + This option may be given multiple times, one per buddy. Each + value may be given as a host, port pair in either IPv4 or + IPv6 form, with port being optional. If no port was specified + the listening port (as specified with --port, see below) + is used to send the synchronization packet to this buddy. +" + +option "port" p +#~~~~~~~~~~~~~~ +"UDP port for incoming synchronization packets" +int typestr = "portnumber" +default = "29900" +optional +details = " + The sync filter receives incoming synchronization packets on + this UDP port. +" + +option "timeout" t +#~~~~~~~~~~~~~~~~~ +"how long to wait for other clients" +int typestr = "milliseconds" +default = "2000" +optional +details = " + Once the sync filter receives its first chunk of input, a + synchronization period of the given number of milliseconds + begins. Playback is deferred until a synchronization packet + has been received from each defined buddy, or until the end + of the period. Buddies which did not send a synchronization + packet in time are temporarily disabled and are not waited for + during subsequent synchronization periods. They are re-enabled + automatically when another synchronization packet arrives. +" diff --git a/net.c b/net.c index 4a31f013..986660fa 100644 --- a/net.c +++ b/net.c @@ -12,6 +12,11 @@ */ #define _GNU_SOURCE +#include +#include +#include +#include +#include #include /* At least NetBSD needs these. */ @@ -142,9 +147,9 @@ static bool host_string_ok(const char *host) * \param hostlen The maximum length of \a host. * \param port To return the port number (if any) of \a url. * - * \return Pointer to \a host, or NULL if failed. - * If NULL is returned, \a host and \a portnum are undefined. If no - * port number was present in \a url, \a portnum is set to -1. + * \return Pointer to \a host, or \p NULL if failed. If \p NULL is returned, + * \a host and \a port are undefined. If no port number was present in \a url, + * \a port is set to -1. * * \sa RFC 3986, 3.2.2/3.2.3 */ @@ -168,16 +173,16 @@ char *parse_url(const char *url, if (*o++ != ']' || (*o != '\0' && *o != ':')) goto failed; } else { - for (; (*c = *o == ':'? '\0' : *o); c++, o++) - if (c == end) + for (; (*c = *o == ':'? '\0' : *o); c++, o++) { + if (c == end && o[1]) goto failed; + } } if (*o == ':') if (para_atoi32(++o, port) < 0 || *port < 0 || *port > 0xffff) goto failed; - if (host_string_ok(host)) return host; failed: @@ -328,7 +333,14 @@ static void flowopt_setopts(int sockfd, struct flowopts *fo) } } -static void flowopt_cleanup(struct flowopts *fo) +/** + * Deallocate all resources of a flowopts structure. + * + * \param fo A pointer as returned from flowopt_new(). + * + * It's OK to pass \p NULL here in which case the function does nothing. + */ +void flowopt_cleanup(struct flowopts *fo) { struct pre_conn_opt *cur, *next; @@ -344,137 +356,148 @@ static void flowopt_cleanup(struct flowopts *fo) } /** - * Resolve IPv4/IPv6 address and create a ready-to-use active or passive socket. + * Resolve an IPv4/IPv6 address. * * \param l4type The layer-4 type (\p IPPROTO_xxx). - * \param passive Whether this is a passive (1) or active (0) socket. + * \param passive Whether \p AI_PASSIVE should be included as hint. * \param host Remote or local hostname or IPv/6 address string. - * \param port_number Decimal port number. - * \param fo Socket options to be set before making the connection. + * \param port_number Used to set the port in each returned address structure. + * \param result addrinfo structures are returned here. * - * This creates a ready-made IPv4/v6 socket structure after looking up the - * necessary parameters. The interpretation of \a host depends on the value of - * \a passive: - * - on a passive socket host is interpreted as an interface IPv4/6 address - * (can be left NULL); - * - on an active socket, \a host is the peer DNS name or IPv4/6 address - * to connect to; - * - \a port_number is in either case the numeric port number (not service - * string). - * - * Furthermore, bind(2) is called on passive sockets, and connect(2) on active - * sockets. The algorithm tries all possible address combinations until it - * succeeds. If \a fo is supplied, options are set and cleanup is performed. - * - * \return This function returns 1 on success and \a -E_ADDRESS_LOOKUP when no - * matching connection could be set up (with details in the error log). - * - * \sa ipv6(7), getaddrinfo(3), bind(2), connect(2). - */ -int makesock(unsigned l4type, bool passive, - const char *host, uint16_t port_number, - struct flowopts *fo) -{ - struct addrinfo *local = NULL, *src = NULL, *remote = NULL, - *dst = NULL, hints; - unsigned int l3type = AF_UNSPEC; - int rc, on = 1, sockfd = -1, - socktype = sock_type(l4type); + * The interpretation of \a host depends on the value of \a passive. On a + * passive socket host is interpreted as an interface IPv4/6 address (can be + * left NULL). On an active socket, \a host is the peer DNS name or IPv4/6 + * address to connect to. + * + * \return Standard. + * + * \sa getaddrinfo(3). + */ +int lookup_address(unsigned l4type, bool passive, const char *host, + int port_number, struct addrinfo **result) +{ + int ret; char port[6]; /* port number has at most 5 digits */ + struct addrinfo *addr = NULL, hints; - sprintf(port, "%u", port_number); + *result = NULL; + sprintf(port, "%u", port_number & 0xffff); /* Set up address hint structure */ memset(&hints, 0, sizeof(hints)); - hints.ai_family = l3type; - hints.ai_socktype = socktype; - /* + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = sock_type(l4type); + /* * getaddrinfo does not support SOCK_DCCP, so for the sake of lookup * (and only then) pretend to be UDP. */ if (l4type == IPPROTO_DCCP) hints.ai_socktype = SOCK_DGRAM; - /* only use addresses available on the host */ hints.ai_flags = AI_ADDRCONFIG; - if (l3type == AF_INET6) - /* use v4-mapped-v6 if no v6 addresses found */ - hints.ai_flags |= AI_V4MAPPED | AI_ALL; - if (passive && host == NULL) hints.ai_flags |= AI_PASSIVE; - /* Obtain local/remote address information */ - if ((rc = getaddrinfo(host, port, &hints, passive ? &local : &remote))) { - PARA_ERROR_LOG("can not resolve %s address %s#%s: %s.\n", - layer4_name(l4type), - host? host : (passive? "[loopback]" : "[localhost]"), - port, gai_strerror(rc)); - rc = -E_ADDRESS_LOOKUP; - goto out; + ret = getaddrinfo(host, port, &hints, &addr); + if (ret != 0) { + PARA_ERROR_LOG("can not resolve %s address %s#%s: %s\n", + layer4_name(l4type), + host? host : (passive? "[loopback]" : "[localhost]"), + port, gai_strerror(ret)); + return -E_ADDRESS_LOOKUP; } + *result = addr; + return 1; +} - /* Iterate over all src/dst combination, exhausting dst first */ - for (src = local, dst = remote; src != NULL || dst != NULL; /* no op */ ) { - if (src && dst && src->ai_family == AF_INET - && dst->ai_family == AF_INET6) - goto get_next_dst; /* v4 -> v6 is not possible */ - - sockfd = socket(src ? src->ai_family : dst->ai_family, - socktype, l4type); - if (sockfd < 0) - goto get_next_dst; +/** + * Create an active or passive socket. + * + * \param l4type \p IPPROTO_TCP, \p IPPROTO_UDP, or \p IPPROTO_DCCP. + * \param passive Whether to call bind(2) or connect(2). + * \param ai Address information as obtained from \ref lookup_address(). + * \param fo Socket options to be set before making the connection. + * + * bind(2) is called on passive sockets, and connect(2) on active sockets. The + * algorithm tries all possible address combinations until it succeeds. If \a + * fo is supplied, options are set but cleanup must be performed in the caller. + * + * \return File descriptor on success, \p E_MAKESOCK on errors. + * + * \sa \ref lookup_address(), \ref makesock(), ip(7), ipv6(7), bind(2), + * connect(2). + */ +int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai, + struct flowopts *fo) +{ + int ret = -E_MAKESOCK, on = 1; + for (; ai; ai = ai->ai_next) { + int fd; + ret = socket(ai->ai_family, sock_type(l4type), l4type); + if (ret < 0) + continue; + fd = ret; + flowopt_setopts(fd, fo); + if (!passive) { + if (connect(fd, ai->ai_addr, ai->ai_addrlen) == 0) + return fd; + close(fd); + continue; + } /* * Reuse the address on passive sockets to avoid failure on * restart (protocols using listen()) and when creating * multiple listener instances (UDP multicast). */ - if (passive && setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, - &on, sizeof(on)) == -1) { - rc = errno; - close(sockfd); - PARA_ERROR_LOG("can not set SO_REUSEADDR: %s\n", - strerror(rc)); - rc = -ERRNO_TO_PARA_ERROR(rc); - break; - } - flowopt_setopts(sockfd, fo); - - if (src) { - if (bind(sockfd, src->ai_addr, src->ai_addrlen) < 0) { - close(sockfd); - goto get_next_src; - } - if (!dst) /* bind-only completed successfully */ - break; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, + sizeof(on)) == -1) { + close(fd); + continue; } - - if (dst && connect(sockfd, dst->ai_addr, dst->ai_addrlen) == 0) - break; /* connection completed successfully */ - close(sockfd); -get_next_dst: - if (dst && (dst = dst->ai_next)) + if (bind(fd, ai->ai_addr, ai->ai_addrlen) < 0) { + close(fd); continue; -get_next_src: - if (src && (src = src->ai_next)) /* restart inner loop */ - dst = remote; + } + return fd; } -out: - if (local) - freeaddrinfo(local); - if (remote) - freeaddrinfo(remote); - flowopt_cleanup(fo); - - if (src == NULL && dst == NULL) { - if (rc >= 0) - rc = -E_MAKESOCK; - PARA_ERROR_LOG("can not create %s socket %s#%s.\n", - layer4_name(l4type), host? host : (passive? - "[loopback]" : "[localhost]"), port); - return rc; + return -E_MAKESOCK; +} + +/** + * Resolve IPv4/IPv6 address and create a ready-to-use active or passive socket. + * + * \param l4type The layer-4 type (\p IPPROTO_xxx). + * \param passive Whether this is a passive or active socket. + * \param host Passed to \ref lookup_address(). + * \param port_number Passed to \ref lookup_address(). + * \param fo Passed to \ref makesock_addrinfo(). + * + * This creates a ready-made IPv4/v6 socket structure after looking up the + * necessary parameters. The function first calls \ref lookup_address() and + * passes the address information to makesock_addrinfo() to create and + * initialize the socket. + * + * \return The newly created file descriptor on success, a negative error code + * on failure. + * + * \sa \ref lookup_address(), \ref makesock_addrinfo(). + */ +int makesock(unsigned l4type, bool passive, const char *host, uint16_t port_number, + struct flowopts *fo) +{ + struct addrinfo *ai; + int ret = lookup_address(l4type, passive, host, port_number, &ai); + + if (ret >= 0) + ret = makesock_addrinfo(l4type, passive, ai, fo); + if (ai) + freeaddrinfo(ai); + if (ret < 0) { + PARA_ERROR_LOG("can not create %s socket %s#%d.\n", + layer4_name(l4type), host? host : (passive? + "[loopback]" : "[localhost]"), port_number); } - return sockfd; + return ret; } /** @@ -687,6 +710,32 @@ void extract_v4_addr(const struct sockaddr_storage *ss, struct in_addr *ia) *ia = ((struct sockaddr_in *)sa)->sin_addr; } +/** + * Compare the address part of IPv4/6 addresses. + * + * \param sa1 First address. + * \param sa2 Second address. + * + * \return True iff the IP address of \a sa1 and \a sa2 match. + */ +bool sockaddr_equal(const struct sockaddr *sa1, const struct sockaddr *sa2) +{ + if (!sa1 || !sa2) + return false; + if (sa1->sa_family != sa2->sa_family) + return false; + if (sa1->sa_family == AF_INET) { + struct sockaddr_in *a1 = (typeof(a1))sa1, + *a2 = (typeof (a2))sa2; + return a1->sin_addr.s_addr == a2->sin_addr.s_addr; + } else if (sa1->sa_family == AF_INET6) { + struct sockaddr_in6 *a1 = (typeof(a1))sa1, + *a2 = (typeof (a2))sa2; + return !memcmp(a1, a2, sizeof(*a1)); + } else + return false; +} + /** * Receive data from a file descriptor. * diff --git a/net.h b/net.h index 0003fa9d..e249498e 100644 --- a/net.h +++ b/net.h @@ -61,6 +61,7 @@ struct flowopts; extern struct flowopts *flowopt_new(void); extern void flowopt_add(struct flowopts *fo, int level, int opt, const char *name, const void *val, int len); +void flowopt_cleanup(struct flowopts *fo); /** Flowopt shortcut macros */ #define OPT_ADD(fo, lev, opt, val, len) flowopt_add(fo, lev, opt, #opt, val, len) @@ -101,12 +102,17 @@ _static_inline_ bool is_valid_ipv6_address(const char *address) return inet_pton(AF_INET6, address, &test_it) != 0; } +int lookup_address(unsigned l4type, bool passive, const char *host, + int port_number, struct addrinfo **result); + /** * Generic socket creation (passive and active sockets). */ -extern int makesock(unsigned l4type, bool passive, - const char *host, uint16_t port_number, - struct flowopts *fo); +int makesock(unsigned l4type, bool passive, const char *host, + uint16_t port_number, struct flowopts *fo); + +int makesock_addrinfo(unsigned l4type, bool passive, struct addrinfo *ai, + struct flowopts *fo); static inline int para_connect_simple(unsigned l4type, const char *host, uint16_t port) @@ -115,6 +121,7 @@ static inline int para_connect_simple(unsigned l4type, } void extract_v4_addr(const struct sockaddr_storage *ss, struct in_addr *ia); +bool sockaddr_equal(const struct sockaddr *sa1, const struct sockaddr *sa2); /** * Functions to support listening sockets. diff --git a/para.h b/para.h index 29658f47..d1e266fc 100644 --- a/para.h +++ b/para.h @@ -19,13 +19,11 @@ #include #include #include -#include -#include -#include -#include /* needed by create_pf_socket */ #include #include #include +#include +#include #include "gcc-compat.h" /** used in various contexts */ diff --git a/send_common.c b/send_common.c index 250a2a0b..a16869b0 100644 --- a/send_common.c +++ b/send_common.c @@ -6,8 +6,13 @@ /** \file send_common.c Functions used by more than one paraslash sender. */ +#include +#include #include #include +#include +#include +#include #include "para.h" #include "error.h" diff --git a/server.c b/server.c index 70d9137e..f92ef551 100644 --- a/server.c +++ b/server.c @@ -63,9 +63,15 @@ * - Forward error correction: \ref fec.c. */ +#include +#include #include #include #include +#include +#include +#include +#include #include "para.h" #include "error.h" diff --git a/sync_filter.c b/sync_filter.c new file mode 100644 index 00000000..379b54b1 --- /dev/null +++ b/sync_filter.c @@ -0,0 +1,406 @@ +/* + * Copyright (C) 2013 Andre Noll + * + * Licensed under the GPL v2. For licencing details see COPYING. + */ + +/** \file sync_filter.c Playback synchronization filter. */ + +#include +#include +#include +#include +#include +#include +#include + +#include "para.h" +#include "sync_filter.cmdline.h" +#include "list.h" +#include "net.h" +#include "sched.h" +#include "ggo.h" +#include "buffer_tree.h" +#include "filter.h" +#include "string.h" +#include "fd.h" +#include "error.h" + +struct sync_buddy_info { + const char *url; + char *host; + int port; + struct addrinfo *ai; + bool disabled; +}; + +/* per open/close data */ +struct sync_buddy { + int fd; + struct sync_buddy_info *sbi; + bool ping_received; + struct list_head node; +}; + +struct sync_filter_context { + int listen_fd; + struct list_head buddies; + struct timeval timeout; + bool ping_sent; +}; + +struct sync_filter_config { + struct sync_filter_args_info *conf; + struct sync_buddy_info *buddy_info; +}; + +#define FOR_EACH_BUDDY(_buddy, _list) \ + list_for_each_entry(_buddy, _list, node) +#define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \ + list_for_each_entry_safe(_buddy, _tmp_buddy, _list, node) + +static void sync_close_buddy(struct sync_buddy *buddy) +{ + if (buddy->fd < 0) + return; + PARA_DEBUG_LOG("closing %s\n", buddy->sbi->url); + close(buddy->fd); + buddy->fd = -1; +} + +static void sync_close_buddies(struct sync_filter_context *ctx) +{ + struct sync_buddy *buddy; + + FOR_EACH_BUDDY(buddy, &ctx->buddies) + sync_close_buddy(buddy); +} + +static void sync_close(struct filter_node *fn) +{ + struct sync_filter_context *ctx = fn->private_data; + + sync_close_buddies(ctx); + if (ctx->listen_fd >= 0) { + close(ctx->listen_fd); + ctx->listen_fd = -1; + } + free(ctx); + fn->private_data = NULL; +} + +static void sync_free_config(void *conf) +{ + struct sync_filter_config *sfc = conf; + int i; + + for (i = 0; i < sfc->conf->buddy_given; i++) { + free(sfc->buddy_info[i].host); + freeaddrinfo(sfc->buddy_info[i].ai); + } + sync_filter_cmdline_parser_free(sfc->conf); + free(sfc); +} + +static void sync_open(struct filter_node *fn) +{ + int i, ret; + struct sync_filter_config *sfc = fn->conf; + struct sync_buddy *buddy; + struct sync_filter_context *ctx; + + assert(sfc); + + ctx = fn->private_data = para_calloc(sizeof(*ctx)); + INIT_LIST_HEAD(&ctx->buddies); + ctx->listen_fd = -1; + + /* create socket to listen for incoming packets */ + ret = makesock( + IPPROTO_UDP, + true /* passive */, + NULL /* no host required */, + sfc->conf->port_arg, + NULL /* no flowopts */ + ); + if (ret < 0) { + PARA_ERROR_LOG("could not create UDP listening socket %d\n", + sfc->conf->port_arg); + return; + } + ctx->listen_fd = ret; + PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd); + + for (i = 0; i < sfc->conf->buddy_given; i++) { + struct sync_buddy_info *sbi = sfc->buddy_info + i; + const char *url = sfc->conf->buddy_arg[i]; + int fd; + + /* make buddy udp socket from address info */ + assert(sbi->ai); + ret = makesock_addrinfo( + IPPROTO_UDP, + false /* not passive */, + sbi->ai, + NULL /* no flowopts */ + ); + if (ret < 0) { + PARA_WARNING_LOG("could not make socket for %s\n", + url); + goto fail; + } + fd = ret; + ret = mark_fd_nonblocking(fd); + if (ret < 0) { + PARA_ERROR_LOG("unable to set nonblock mode for %s\n", + url); + close(fd); + goto fail; + } + buddy = para_malloc(sizeof(*buddy)); + buddy->fd = fd; + buddy->sbi = sbi; + buddy->ping_received = false; + para_list_add(&buddy->node, &ctx->buddies); + + PARA_INFO_LOG("opened buddy %s on fd %d\n", url, fd); + continue; +fail: + PARA_WARNING_LOG("%s\n", para_strerror(-ret)); + } +} + +static int sync_parse_config(int argc, char **argv, void **result) +{ + int i, ret, n; + struct sync_filter_config *sfc; + struct sync_filter_args_info *conf = para_malloc(sizeof(*conf)); + + sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */ + sfc = para_calloc(sizeof(*sfc)); + sfc->conf = conf; + n = conf->buddy_given; + sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info)); + PARA_INFO_LOG("initializing buddy info array of length %d\n", n); + for (i = 0; i < n; i++) { + const char *url = conf->buddy_arg[i]; + size_t len = strlen(url); + char *host = para_malloc(len + 1); + int port; + struct addrinfo *ai; + struct sync_buddy_info *sbi = sfc->buddy_info + i; + + if (!parse_url(url, host, len, &port)) { + free(host); + PARA_ERROR_LOG("could not parse url %s\n", url); + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + goto fail; + } + if (port < 0) + port = conf->port_arg; + ret = lookup_address(IPPROTO_UDP, false /* not passive */, + host, port, &ai); + if (ret < 0) { + PARA_ERROR_LOG("host lookup failure for %s\n", url); + free(host); + goto fail; + } + sbi->url = url; + sbi->host = host; + sbi->port = port; + sbi->ai = ai; + sbi->disabled = false; + PARA_DEBUG_LOG("buddy #%d: %s\n", i, url); + } + *result = sfc; + return 1; +fail: + assert(ret < 0); + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + sync_free_config(sfc); + return ret; +} + +/* + * True if we sent a packet to all budies and received a packet from each + * enabled buddy. + */ +static bool sync_complete(struct sync_filter_context *ctx) +{ + struct sync_buddy *buddy; + + if (!ctx->ping_sent) + return false; + FOR_EACH_BUDDY(buddy, &ctx->buddies) + if (!buddy->sbi->disabled && !buddy->ping_received) + return false; + return true; +} + +static void sync_disable_active_buddies(struct sync_filter_context *ctx) +{ + struct sync_buddy *buddy; + + FOR_EACH_BUDDY(buddy, &ctx->buddies) { + if (buddy->sbi->disabled) + continue; + PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url); + buddy->sbi->disabled = true; + } +} + +static void sync_set_timeout(struct sync_filter_context *ctx, + struct sync_filter_config *sfc) +{ + struct timeval to; + + ms2tv(sfc->conf->timeout_arg, &to); + tv_add(now, &to, &ctx->timeout); +} + +static void sync_pre_select(struct sched *s, struct task *t) +{ + int ret; + struct filter_node *fn = container_of(t, struct filter_node, task); + struct sync_filter_context *ctx = fn->private_data; + struct sync_filter_config *sfc = fn->conf; + + if (list_empty(&ctx->buddies)) + return sched_min_delay(s); + if (ctx->listen_fd < 0) + return sched_min_delay(s); + ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL); + if (ret < 0) + return sched_min_delay(s); + para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno); + if (ret == 0) + return; + if (ctx->timeout.tv_sec == 0) { /* must ping buddies */ + sync_set_timeout(ctx, sfc); + return sched_min_delay(s); + } + if (sync_complete(ctx)) /* push down what we have */ + return sched_min_delay(s); + sched_request_barrier_or_min_delay(&ctx->timeout, s); +} + +static struct sync_buddy *sync_find_buddy(struct sockaddr *addr, + struct list_head *list) +{ + struct sync_buddy *buddy; + + FOR_EACH_BUDDY(buddy, list) + if (sockaddr_equal(buddy->sbi->ai->ai_addr, addr)) + return buddy; + return NULL; +} + +static int sync_post_select(__a_unused struct sched *s, struct task *t) +{ + int ret; + struct filter_node *fn = container_of(t, struct filter_node, task); + struct sync_filter_context *ctx = fn->private_data; + struct sync_filter_config *sfc = fn->conf; + struct sync_buddy *buddy, *tmp; + + if (list_empty(&ctx->buddies)) + goto success; + ret = -E_SYNC_LISTEN_FD; + if (ctx->listen_fd < 0) + goto fail; + ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL); + if (ret < 0) + goto fail; + if (ret == 0) + return 0; + if (ctx->timeout.tv_sec == 0) + sync_set_timeout(ctx, sfc); + else { + if (tv_diff(&ctx->timeout, now, NULL) < 0) { + sync_disable_active_buddies(ctx); + goto success; + } + } + if (!ctx->ping_sent) { + FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies) { + char c = '\0'; + PARA_INFO_LOG("pinging %s (%s)\n", + buddy->sbi->url, buddy->sbi->disabled? + "disabled" : "enabled"); + ret = xwrite(buddy->fd, &c, 1); + sync_close_buddy(buddy); + if (ret < 0) { + PARA_WARNING_LOG("failed to write to %s: %s\n", + buddy->sbi->url, para_strerror(-ret)); + list_del(&buddy->node); + } + } + ctx->ping_sent = true; + } + if (FD_ISSET(ctx->listen_fd, &s->rfds)) { + char c; + for (;;) { + struct sockaddr src_addr; + socklen_t len = sizeof(src_addr); + ret = recvfrom(ctx->listen_fd, &c, 1, MSG_DONTWAIT, + &src_addr, &len); + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + ret = -ERRNO_TO_PARA_ERROR(errno); + goto fail; + } + buddy = sync_find_buddy(&src_addr, &ctx->buddies); + if (!buddy) { + PARA_NOTICE_LOG("pinged by unknown\n"); + continue; + } + PARA_DEBUG_LOG("pinged by %s\n", buddy->sbi->url); + if (buddy->sbi->disabled) { + PARA_NOTICE_LOG("enabling %s\n", + buddy->sbi->url); + buddy->sbi->disabled = false; + } + list_del(&buddy->node); + } + } + if (!sync_complete(ctx)) + return 1; + /* + * Although all enabled buddies are in sync we do not splice out + * ourselves immediately. We rather wait until the timout expires, + * or the buddy list has become empty. This opens a time window + * for disabled buddies to become enabled by sending us a packet. + */ + btr_pushdown(fn->btrn); + return 1; +success: + ret = -E_SYNC_COMPLETE; /* success */ + goto out; +fail: + PARA_WARNING_LOG("%s\n", para_strerror(-ret)); +out: + sync_close_buddies(ctx); + btr_splice_out_node(&fn->btrn); + assert(ret < 0); + return ret; +} + +/** + * The synchronization filter. + * + * \param f Pointer to the struct to initialize. + */ +void sync_filter_init(struct filter *f) +{ + struct sync_filter_args_info dummy; + + sync_filter_cmdline_parser_init(&dummy); + f->open = sync_open; + f->close = sync_close; + f->pre_select = sync_pre_select; + f->post_select = sync_post_select; + f->parse_config = sync_parse_config; + f->free_config = sync_free_config; + f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter); +} diff --git a/udp_recv.c b/udp_recv.c index 1648cad8..96b6c731 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -5,9 +5,14 @@ */ /** \file udp_recv.c Paraslash's udp receiver */ +#include #include #include #include +#include +#include +#include +#include #include "para.h" #include "error.h" diff --git a/udp_send.c b/udp_send.c index 7930f092..4580f009 100644 --- a/udp_send.c +++ b/udp_send.c @@ -6,13 +6,17 @@ /** \file udp_send.c Para_server's udp sender. */ - +#include +#include #include #include #include #include #include +#include +#include #include +#include #include "server.cmdline.h" #include "para.h" diff --git a/vss.c b/vss.c index 06707d6c..2349b093 100644 --- a/vss.c +++ b/vss.c @@ -11,8 +11,14 @@ * senders. */ +#include +#include #include #include +#include +#include +#include +#include #include "para.h" #include "error.h"