-/*
- * Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>
- *
- * Licensed under the GPL v2. For licencing details see COPYING.
- */
+/* Copyright (C) 2013 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
/** \file sync_filter.c Playback synchronization filter. */
#include <arpa/inet.h>
#include <sys/un.h>
#include <netdb.h>
+#include <lopsub.h>
+#include "filter_cmd.lsg.h"
#include "para.h"
-#include "sync_filter.cmdline.h"
#include "list.h"
#include "net.h"
#include "sched.h"
-#include "ggo.h"
#include "buffer_tree.h"
#include "filter.h"
#include "string.h"
bool disabled;
};
-/* per open/close data */
+/* One active buddy */
struct sync_buddy {
int fd;
struct sync_buddy_info *sbi;
struct list_head node;
};
+/* Allocated in ->open(), stored in fn->private_data */
struct sync_filter_context {
int listen_fd;
struct list_head buddies;
bool ping_sent;
};
-struct sync_filter_config {
- struct sync_filter_args_info *conf;
- struct sync_buddy_info *buddy_info;
-};
-
#define FOR_EACH_BUDDY(_buddy, _list) \
list_for_each_entry(_buddy, _list, node)
#define FOR_EACH_BUDDY_SAFE(_buddy, _tmp_buddy, _list) \
static void sync_close_buddy(struct sync_buddy *buddy)
{
- if (buddy->fd < 0)
- return;
- PARA_DEBUG_LOG("closing %s\n", buddy->sbi->url);
+ PARA_DEBUG_LOG("closing %s, fd %d\n", buddy->sbi->url, buddy->fd);
close(buddy->fd);
- buddy->fd = -1;
+ list_del(&buddy->node);
+ free(buddy);
}
static void sync_close_buddies(struct sync_filter_context *ctx)
{
- struct sync_buddy *buddy;
+ struct sync_buddy *buddy, *tmp;
- FOR_EACH_BUDDY(buddy, &ctx->buddies)
+ FOR_EACH_BUDDY_SAFE(buddy, tmp, &ctx->buddies)
sync_close_buddy(buddy);
}
fn->private_data = NULL;
}
-static void sync_free_config(void *conf)
+static void sync_teardown(const struct lls_parse_result *lpr, void *conf)
{
- struct sync_filter_config *sfc = conf;
- int i;
+ struct sync_buddy_info *sbi = conf;
+ int i, num_buddies = FILTER_CMD_OPT_GIVEN(SYNC, BUDDY, lpr);
- for (i = 0; i < sfc->conf->buddy_given; i++) {
- free(sfc->buddy_info[i].host);
- freeaddrinfo(sfc->buddy_info[i].ai);
+ for (i = 0; i < num_buddies; i++) {
+ free(sbi[i].host);
+ freeaddrinfo(sbi[i].ai);
}
- sync_filter_cmdline_parser_free(sfc->conf);
- free(sfc);
+ free(sbi);
}
static void sync_open(struct filter_node *fn)
{
int i, ret;
- struct sync_filter_config *sfc = fn->conf;
struct sync_buddy *buddy;
struct sync_filter_context *ctx;
-
- assert(sfc);
+ struct sync_buddy_info *sbi = fn->conf;
+ uint32_t port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, fn->lpr);
+ unsigned buddy_given;
+ const struct lls_opt_result *r_b;
ctx = fn->private_data = para_calloc(sizeof(*ctx));
- INIT_LIST_HEAD(&ctx->buddies);
- ctx->listen_fd = -1;
+ init_list_head(&ctx->buddies);
/* create socket to listen for incoming packets */
ret = makesock(
IPPROTO_UDP,
true /* passive */,
NULL /* no host required */,
- sfc->conf->port_arg,
+ port,
NULL /* no flowopts */
);
if (ret < 0) {
- PARA_ERROR_LOG("could not create UDP listening socket %d\n",
- sfc->conf->port_arg);
+ PARA_ERROR_LOG("could not create UDP listening socket %u\n",
+ port);
return;
}
ctx->listen_fd = ret;
PARA_INFO_LOG("listening on fd %d\n", ctx->listen_fd);
- for (i = 0; i < sfc->conf->buddy_given; i++) {
- struct sync_buddy_info *sbi = sfc->buddy_info + i;
- const char *url = sfc->conf->buddy_arg[i];
+ r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, fn->lpr);
+ buddy_given = lls_opt_given(r_b);
+ for (i = 0; i < buddy_given; i++) {
int fd;
+ const char *url = lls_string_val(i, r_b);
/* make buddy udp socket from address info */
assert(sbi->ai);
ret = makesock_addrinfo(
IPPROTO_UDP,
false /* not passive */,
- sbi->ai,
+ sbi[i].ai,
NULL /* no flowopts */
);
if (ret < 0) {
}
buddy = para_malloc(sizeof(*buddy));
buddy->fd = fd;
- buddy->sbi = sbi;
+ buddy->sbi = sbi + i;
buddy->ping_received = false;
para_list_add(&buddy->node, &ctx->buddies);
}
}
-static int sync_parse_config(int argc, char **argv, void **result)
+/*
+ * Build an array of struct sync_buddy_info with one entry for each buddy given
+ * in the arguments. This array is not affected by sync_close(), so information
+ * stored there can be used for multiple instances (para_audiod). We store the
+ * resolved url and the ->disabled bit in this array.
+ */
+static void *sync_setup(const struct lls_parse_result *lpr)
{
- int i, ret, n;
- struct sync_filter_config *sfc;
- struct sync_filter_args_info *conf = para_malloc(sizeof(*conf));
-
- sync_filter_cmdline_parser(argc, argv, conf); /* exits on error */
- sfc = para_calloc(sizeof(*sfc));
- sfc->conf = conf;
- n = conf->buddy_given;
- sfc->buddy_info = para_malloc((n + 1) * sizeof(*sfc->buddy_info));
- PARA_INFO_LOG("initializing buddy info array of length %d\n", n);
+ int i, ret;
+ unsigned n;
+ struct sync_buddy_info *sbi;
+ const struct lls_opt_result *r_b;
+
+ r_b = FILTER_CMD_OPT_RESULT(SYNC, BUDDY, lpr);
+ n = lls_opt_given(r_b);
+ sbi = para_malloc(n * sizeof(*sbi));
+ PARA_INFO_LOG("initializing buddy info array of length %u\n", n);
for (i = 0; i < n; i++) {
- const char *url = conf->buddy_arg[i];
+ const char *url = lls_string_val(i, r_b);
size_t len = strlen(url);
char *host = para_malloc(len + 1);
int port;
struct addrinfo *ai;
- struct sync_buddy_info *sbi = sfc->buddy_info + i;
if (!parse_url(url, host, len, &port)) {
- free(host);
PARA_ERROR_LOG("could not parse url %s\n", url);
- ret = -ERRNO_TO_PARA_ERROR(EINVAL);
- goto fail;
+ exit(EXIT_FAILURE);
}
if (port < 0)
- port = conf->port_arg;
+ port = FILTER_CMD_OPT_UINT32_VAL(SYNC, PORT, lpr);
ret = lookup_address(IPPROTO_UDP, false /* not passive */,
host, port, &ai);
if (ret < 0) {
- PARA_ERROR_LOG("host lookup failure for %s\n", url);
- free(host);
- goto fail;
+ PARA_ERROR_LOG("host lookup failure for %s: %s\n",
+ url, para_strerror(-ret));
+ exit(EXIT_FAILURE);
}
- sbi->url = url;
- sbi->host = host;
- sbi->port = port;
- sbi->ai = ai;
- sbi->disabled = false;
+ sbi[i].url = url;
+ sbi[i].host = host;
+ sbi[i].port = port;
+ sbi[i].ai = ai;
+ sbi[i].disabled = false;
PARA_DEBUG_LOG("buddy #%d: %s\n", i, url);
}
- *result = sfc;
- return 1;
-fail:
- assert(ret < 0);
- PARA_ERROR_LOG("%s\n", para_strerror(-ret));
- sync_free_config(sfc);
- return ret;
+ return sbi;
}
/*
- * True if we sent a packet to all budies and received a packet from each
+ * True if we sent a packet to all buddies and received a packet from each
* enabled buddy.
*/
static bool sync_complete(struct sync_filter_context *ctx)
FOR_EACH_BUDDY(buddy, &ctx->buddies) {
if (buddy->sbi->disabled)
continue;
+ if (buddy->ping_received == true)
+ continue;
PARA_NOTICE_LOG("disabling %s\n", buddy->sbi->url);
buddy->sbi->disabled = true;
}
}
static void sync_set_timeout(struct sync_filter_context *ctx,
- struct sync_filter_config *sfc)
+ struct lls_parse_result *lpr)
{
+ uint32_t ms = FILTER_CMD_OPT_UINT32_VAL(SYNC, TIMEOUT, lpr);
struct timeval to;
- ms2tv(sfc->conf->timeout_arg, &to);
+ ms2tv(ms, &to);
tv_add(now, &to, &ctx->timeout);
}
int ret;
struct filter_node *fn = context;
struct sync_filter_context *ctx = fn->private_data;
- struct sync_filter_config *sfc = fn->conf;
if (list_empty(&ctx->buddies))
return sched_min_delay(s);
if (ret == 0)
return;
if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
- sync_set_timeout(ctx, sfc);
+ sync_set_timeout(ctx, fn->lpr);
return sched_min_delay(s);
}
if (sync_complete(ctx)) /* push down what we have */
int ret;
struct filter_node *fn = context;
struct sync_filter_context *ctx = fn->private_data;
- struct sync_filter_config *sfc = fn->conf;
struct sync_buddy *buddy, *tmp;
if (list_empty(&ctx->buddies))
if (ret == 0)
return 0;
if (ctx->timeout.tv_sec == 0)
- sync_set_timeout(ctx, sfc);
+ sync_set_timeout(ctx, fn->lpr);
else {
if (tv_diff(&ctx->timeout, now, NULL) < 0) {
sync_disable_active_buddies(ctx);
buddy->sbi->url, buddy->sbi->disabled?
"disabled" : "enabled");
ret = xwrite(buddy->fd, &c, 1);
- sync_close_buddy(buddy);
if (ret < 0) {
PARA_WARNING_LOG("failed to write to %s: %s\n",
buddy->sbi->url, para_strerror(-ret));
- list_del(&buddy->node);
+ sync_close_buddy(buddy);
}
}
ctx->ping_sent = true;
buddy->sbi->url);
buddy->sbi->disabled = false;
}
- list_del(&buddy->node);
+ buddy->ping_received = true;
}
}
if (!sync_complete(ctx))
return 1;
/*
* Although all enabled buddies are in sync we do not splice out
- * ourselves immediately. We rather wait until the timout expires,
+ * ourselves immediately. We rather wait until the timeout expires,
* or the buddy list has become empty. This opens a time window
* for disabled buddies to become enabled by sending us a packet.
*/
ret = -E_SYNC_COMPLETE; /* success */
goto out;
fail:
- PARA_WARNING_LOG("%s\n", para_strerror(-ret));
+ if (ret != -E_BTR_EOF)
+ PARA_WARNING_LOG("%s\n", para_strerror(-ret));
out:
sync_close_buddies(ctx);
btr_splice_out_node(&fn->btrn);
return ret;
}
-/**
- * The synchronization filter.
- *
- * \param f Pointer to the struct to initialize.
- */
-void sync_filter_init(struct filter *f)
-{
- struct sync_filter_args_info dummy;
-
- sync_filter_cmdline_parser_init(&dummy);
- f->open = sync_open;
- f->close = sync_close;
- f->pre_select = sync_pre_select;
- f->post_select = sync_post_select;
- f->parse_config = sync_parse_config;
- f->free_config = sync_free_config;
- f->help = (struct ggo_help)DEFINE_GGO_HELP(sync_filter);
-}
+const struct filter lsg_filter_cmd_com_sync_user_data = {
+ .setup = sync_setup,
+ .open = sync_open,
+ .pre_select = sync_pre_select,
+ .post_select = sync_post_select,
+ .close = sync_close,
+ .teardown = sync_teardown
+};