0.4.5 (to be announced) "symmetric randomization"
-------------------------------------------------
+Bug fixes, internal cleanups and variable-sized FEC slices.
+
- Contains a fix for an invalid-free-bug in the ogg audio format
handler code.
- - Improved documentation and error diagnostics.
- Switching off the DCCP sender works again.
- para_audiod handles crashes of para_server more robustly.
- - Internal scheduler cleanups.
+ - Internal scheduler and writer cleanups.
+ - Reduced latency due to variable-sized FEC slices.
+ - Improved documentation and error diagnostics.
+ - The build of para_server is now optional, allowing the build
+ to succeed in case libosl is not installed.
------------------------------------------
0.4.4 (2010-08-06) "persistent regularity"
struct private_alsa_write_data {
/** The alsa handle */
snd_pcm_t *handle;
- /** Determined and set by alsa_open(). */
+ /** Determined and set by alsa_init(). */
int bytes_per_frame;
/** The approximate maximum buffer duration in us. */
unsigned buffer_time;
return 1;
}
-/* Open an instance of the alsa writer. */
-static int alsa_open(struct writer_node *wn)
-{
- wn->private_data = para_calloc(sizeof(struct private_alsa_write_data));
- return 1;
-}
-
static void alsa_write_pre_select(struct sched *s, struct task *t)
{
struct writer_node *wn = container_of(t, struct writer_node, task);
struct private_alsa_write_data *pad = wn->private_data;
struct timeval tv;
snd_pcm_sframes_t avail, underrun;
- int ret;
+ int ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
- if (!pad->handle)
- return;
- ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
- if (ret < 0)
- sched_request_timeout_ms(20, s);
- if (ret <= 0)
+ if (ret == 0)
return;
+ if (ret < 0 || !pad)
+ return sched_min_delay(s);
/*
* Data is available to be written to the alsa handle. Compute number
* of milliseconds until next buffer underrun would occur.
struct private_alsa_write_data *pad = wn->private_data;
PARA_INFO_LOG("closing writer node %p\n", wn);
- if (pad->handle) {
- /*
- * It's OK to have a blocking operation here because we already
- * made sure that the PCM output buffer is (nearly) empty.
- */
- snd_pcm_nonblock(pad->handle, 0);
- snd_pcm_drain(pad->handle);
- snd_pcm_close(pad->handle);
- snd_config_update_free_global();
- }
+ if (!pad)
+ return;
+ /*
+ * It's OK to have a blocking operation here because we already made
+ * sure that the PCM output buffer is (nearly) empty.
+ */
+ snd_pcm_nonblock(pad->handle, 0);
+ snd_pcm_drain(pad->handle);
+ snd_pcm_close(pad->handle);
+ snd_config_update_free_global();
free(pad);
}
return;
btr_merge(btrn, wn->min_iqs);
bytes = btr_next_buffer(btrn, &data);
- if (ret < 0 || bytes < pad->bytes_per_frame) { /* eof */
+ if (ret < 0 || bytes < wn->min_iqs) { /* eof */
assert(btr_no_parent(btrn));
ret = -E_ALSA_EOF;
- if (!pad->handle)
+ if (!pad)
goto err;
/* wait until pending frames are played */
if (pad->drain_barrier.tv_sec == 0) {
goto err;
return;
}
- if (!pad->handle) {
+ if (!pad) {
int32_t val;
+ pad = para_calloc(sizeof(*pad));
+ wn->private_data = pad;
if (bytes == 0) /* no data available */
return;
get_btr_sample_rate(btrn, &val);
t->error = ret;
}
-__malloc static void *alsa_parse_config(const char *options)
+__malloc static void *alsa_parse_config_or_die(const char *options)
{
- int ret;
- struct alsa_write_args_info *conf
- = para_calloc(sizeof(struct alsa_write_args_info));
+ struct alsa_write_args_info *conf = para_calloc(sizeof(*conf));
PARA_INFO_LOG("options: %s, %zd\n", options, strcspn(options, " \t"));
- ret = alsa_cmdline_parser_string(options, conf, "alsa_write");
- if (ret)
- goto err_out;
- PARA_INFO_LOG("help given: %d\n", conf->help_given);
+ /* exits on errors */
+ alsa_cmdline_parser_string(options, conf, "alsa_write");
return conf;
-err_out:
- free(conf);
- return NULL;
}
static void alsa_free_config(void *conf)
struct alsa_write_args_info dummy;
alsa_cmdline_parser_init(&dummy);
- w->open = alsa_open;
w->close = alsa_close;
w->pre_select = alsa_write_pre_select;
w->post_select = alsa_write_post_select;
- w->parse_config = alsa_parse_config;
+ w->parse_config_or_die = alsa_parse_config_or_die;
w->shutdown = NULL; /* nothing to do */
w->free_config = alsa_free_config;
w->help = (struct ggo_help) {
wma_common wmadec_filter buffer_tree
"
-executables="server recv filter audioc write client afh audiod"
+executables="recv filter audioc write client afh audiod"
recv_cmdline_objs="add_cmdline(recv http_recv dccp_recv udp_recv)"
AC_CHECK_HEADER(osl.h, [], have_osl=no)
AC_CHECK_LIB([osl], [osl_open_table], [], have_osl=no)
if test "$have_osl" = "no"; then
- AC_MSG_ERROR([libosl not found, download it at
+ AC_MSG_WARN([libosl not found, can not build para_server.
+Download libosl at
http://systemlinux.org/~maan/osl
or execute
git clone git://git.tuebingen.mpg.de/osl
])
+else
+ extras="$extras server"
+ executables="$executables server"
+ AC_SUBST(osl_cppflags)
+ server_ldflags="$server_ldflags -L$with_osl_libs"
fi
-AC_SUBST(osl_cppflags)
-server_ldflags="$server_ldflags -L$with_osl_libs"
CPPFLAGS="$OLD_CPPFLAGS"
LDFLAGS="$OLD_LDFLAGS"
LIBS="$OLD_LIBS"
PARA_ERROR(SENDMSG, "sendmsg() failed"), \
PARA_ERROR(RECVMSG, "recvmsg() failed"), \
PARA_ERROR(SCM_CREDENTIALS, "did not receive SCM credentials"), \
+ PARA_ERROR(MAKESOCK, "makesock error"), \
#define UDP_RECV_ERRORS \
* In any case, \a num_bytes contains the number of bytes that have been
* successfully read from \a fd (zero if the first readv() call failed with
* EAGAIN). Note that even if the function returns negative, some data might
- * have been read before the error occured. In this case \a num_bytes is
+ * have been read before the error occurred. In this case \a num_bytes is
* positive.
*
* \sa \ref write_nonblock(), read(2), readv(2).
ret = fec_new(k, n, &pfd->fec);
if (ret < 0)
return ret;
- pfd->btrp = btr_pool_new("fecdec", 20 * k * h->slice_bytes);
+ pfd->btrp = btr_pool_new("fecdec", 64 * 1024);
/* decode and clear the first group */
ret = decode_group(pfd->first_complete_group, fn);
if (ret < 0)
return result;
}
-static int file_write_open(struct writer_node *wn)
+static int prepare_output_file(struct writer_node *wn)
{
- struct private_file_write_data *pfwd = para_calloc(
- sizeof(struct private_file_write_data));
struct file_write_args_info *conf = wn->conf;
char *filename;
int ret;
+ struct private_file_write_data *pfwd = para_calloc(sizeof(*pfwd));
if (conf->filename_given)
filename = conf->filename_arg;
else
filename = random_filename();
- wn->private_data = pfwd;
ret = para_open(filename, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
if (!conf->filename_given)
free(filename);
goto out;
pfwd->fd = ret;
ret = mark_fd_blocking(pfwd->fd);
- if (ret >= 0)
- return 1;
+ if (ret < 0)
+ goto out_close;
+ wn->private_data = pfwd;
+ return 1;
+out_close:
close(pfwd->fd);
out:
free(pfwd);
{
struct writer_node *wn = container_of(t, struct writer_node, task);
struct private_file_write_data *pfwd = wn->private_data;
- int ret;
+ int ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
- t->error = 0;
- ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
- if (ret > 0)
- para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
- else if (ret < 0)
- sched_min_delay(s);
+ if (ret == 0)
+ return;
+ if (ret < 0 || !pfwd)
+ return sched_min_delay(s);
+ para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
}
static void file_write_close(struct writer_node *wn)
{
struct private_file_write_data *pfwd = wn->private_data;
+ if (!pfwd)
+ return;
close(pfwd->fd);
free(pfwd);
}
ret = btr_node_status(btrn, wn->min_iqs, BTR_NT_LEAF);
if (ret <= 0)
goto out;
+ if (!pfwd) {
+ ret = prepare_output_file(wn);
+ goto out;
+ }
if (!FD_ISSET(pfwd->fd, &s->wfds))
return;
bytes = btr_next_buffer(btrn, &buf);
t->error = ret;
}
-__malloc static void *file_write_parse_config(const char *options)
+__malloc static void *file_write_parse_config_or_die(const char *options)
{
- struct file_write_args_info *conf
- = para_calloc(sizeof(struct file_write_args_info));
- int ret = file_cmdline_parser_string(options, conf, "file_write");
-
- PARA_INFO_LOG("conf->filename_given: %d\n", conf->filename_given);
- if (!ret)
- return conf;
- free(conf);
- return NULL;
+ struct file_write_args_info *conf = para_calloc(sizeof(*conf));
+
+ /* exits on errors */
+ file_cmdline_parser_string(options, conf, "file_write");
+ return conf;
}
static void file_write_free_config(void *conf)
struct file_write_args_info dummy;
file_cmdline_parser_init(&dummy);
- w->open = file_write_open;
w->pre_select = file_write_pre_select;
w->post_select = file_write_post_select;
- w->parse_config = file_write_parse_config;
+ w->parse_config_or_die = file_write_parse_config_or_die;
w->free_config = file_write_free_config;
w->close = file_write_close;
w->shutdown = NULL; /* nothing to do */
const char *host, uint16_t port_number,
struct flowopts *fo)
{
- struct addrinfo *local = NULL, *src,
- *remote = NULL, *dst, hints;
+ struct addrinfo *local = NULL, *src = NULL, *remote = NULL,
+ *dst = NULL, hints;
unsigned int l3type = AF_UNSPEC;
int rc, on = 1, sockfd = -1,
socktype = sock_type(l4type);
layer4_name(l4type),
host? host : (passive? "[loopback]" : "[localhost]"),
port, gai_strerror(rc));
- return -E_ADDRESS_LOOKUP;
+ rc = -E_ADDRESS_LOOKUP;
+ goto out;
}
/* Iterate over all src/dst combination, exhausting dst first */
close(sockfd);
PARA_ERROR_LOG("can not set SO_REUSEADDR: %s\n",
strerror(rc));
- return -ERRNO_TO_PARA_ERROR(rc);
+ rc = -ERRNO_TO_PARA_ERROR(rc);
+ break;
}
flowopt_setopts(sockfd, fo);
if (src && (src = src->ai_next)) /* restart inner loop */
dst = remote;
}
+out:
if (local)
freeaddrinfo(local);
if (remote)
flowopt_cleanup(fo);
if (src == NULL && dst == NULL) {
- rc = errno;
+ if (rc >= 0)
+ rc = -E_MAKESOCK;
PARA_ERROR_LOG("can not create %s socket %s#%s.\n",
layer4_name(l4type), host? host : (passive?
"[loopback]" : "[localhost]"), port);
- return -ERRNO_TO_PARA_ERROR(rc);
+ return rc;
}
return sockfd;
}
struct private_oss_write_data *powd = wn->private_data;
int ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
- t->error = 0;
- if (ret < 0)
- sched_min_delay(s);
- else if (ret > 0)
- para_fd_set(powd->fd, &s->wfds, &s->max_fileno);
+ if (ret == 0)
+ return;
+ if (ret < 0 || !powd)
+ return sched_min_delay(s);
+ para_fd_set(powd->fd, &s->wfds, &s->max_fileno);
}
static void oss_close(struct writer_node *wn)
{
struct private_oss_write_data *powd = wn->private_data;
- if (powd->fd >= 0)
- close(powd->fd);
+ if (!powd)
+ return;
+ close(powd->fd);
free(powd);
}
int ret, format;
unsigned ch, rate;
struct oss_write_args_info *conf = wn->conf;
- struct private_oss_write_data *powd = wn->private_data;
+ struct private_oss_write_data *powd = para_calloc(sizeof(*powd));
+ wn->private_data = powd;
PARA_INFO_LOG("opening %s\n", conf->device_arg);
ret = para_open(conf->device_arg, O_WRONLY, 0);
if (ret < 0)
- return ret;
+ goto err_free;
powd->fd = ret;
ret = mark_fd_nonblocking(powd->fd);
if (ret < 0)
return 1;
err:
close(powd->fd);
- powd->fd = -1;
+err_free:
+ free(powd);
return ret;
}
goto out;
if (ret == 0)
return;
- if (powd->fd < 0) {
+ if (!powd) {
int32_t rate, ch, format;
get_btr_sample_rate(btrn, &rate);
get_btr_channels(btrn, &ch);
btr_remove_node(btrn);
}
-static int oss_open(struct writer_node *wn)
-{
- struct private_oss_write_data *powd;
-
- powd = para_calloc(sizeof(*powd));
- wn->private_data = powd;
- powd->fd = -1;
- return 1;
-}
-
-__malloc static void *oss_parse_config(const char *options)
+__malloc static void *oss_parse_config_or_die(const char *options)
{
- int ret;
struct oss_write_args_info *conf = para_calloc(sizeof(*conf));
- ret = oss_cmdline_parser_string(options, conf, "oss_write");
- if (ret)
- goto err_out;
+ /* exits on errors */
+ oss_cmdline_parser_string(options, conf, "oss_write");
return conf;
-err_out:
- free(conf);
- return NULL;
}
static void oss_free_config(void *conf)
struct oss_write_args_info dummy;
oss_cmdline_parser_init(&dummy);
- w->open = oss_open;
w->close = oss_close;
w->pre_select = oss_pre_select;
w->post_select = oss_post_select;
- w->parse_config = oss_parse_config;
+ w->parse_config_or_die = oss_parse_config_or_die;
w->free_config = oss_free_config;
w->shutdown = NULL;
w->help = (struct ggo_help) {
#define ENDIAN_FLAGS 0
#endif
-static int osx_write_open(struct writer_node *wn)
-{
- struct private_osx_write_data *powd = para_calloc(sizeof(*powd));
-
- wn->private_data = powd;
- init_buffers(wn);
- return 0;
-}
-
static int core_audio_init(struct writer_node *wn)
{
- struct private_osx_write_data *powd = wn->private_data;
+ struct private_osx_write_data *powd = para_calloc(sizeof(*powd));
ComponentDescription desc;
Component comp;
AURenderCallbackStruct inputCallback = {osx_callback, powd};
struct btr_node *btrn = wn->btrn;
int32_t val;
+ wn->private_data = powd;
+ init_buffers(wn);
/* where did that default audio output go? */
desc.componentType = kAudioUnitType_Output;
desc.componentSubType = kAudioUnitSubType_DefaultOutput;
e1:
CloseComponent(powd->audio_unit);
e0:
+ free(powd);
+ wn->private_data = NULL;
return ret;
}
-__malloc static void *osx_write_parse_config(const char *options)
+__malloc static void *osx_write_parse_config_or_die(const char *options)
{
- struct osx_write_args_info *conf
- = para_calloc(sizeof(struct osx_write_args_info));
- PARA_INFO_LOG("options: %s\n", options);
- int ret = osx_cmdline_parser_string(options, conf, "osx_write");
- if (ret)
- goto err_out;
- return conf;
-err_out:
- free(conf);
- return NULL;
+ struct osx_write_args_info *conf = para_calloc(sizeof(*conf));
+ /* exits on errors */
+ osx_cmdline_parser_string(options, conf, "osx_write");
+ return conf;
}
static void osx_free_config(void *conf)
{
struct private_osx_write_data *powd = wn->private_data;
+ if (!powd)
+ return;
PARA_INFO_LOG("closing writer node %p\n", wn);
AudioOutputUnitStop(powd->audio_unit);
AudioUnitUninitialize(powd->audio_unit);
size_t bytes;
int ret = 0;
- while (powd->to->remaining <= 0) {
+ while (!powd || powd->to->remaining <= 0) {
ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
if (ret <= 0)
break;
- if (powd->sample_rate == 0) {
+ if (!powd) {
ret = core_audio_init(wn);
if (ret < 0)
break;
+ powd = wn->private_data;
}
btr_merge(btrn, 8192);
bytes = btr_next_buffer(btrn, &data);
}
powd->to = powd->to->next;
}
- if (ret < 0 && powd->from->remaining <= 0) {
+ if (ret < 0 && (!powd || powd->from->remaining <= 0)) {
btr_remove_node(btrn);
t->error = ret;
}
struct writer_node *wn = container_of(t, struct writer_node, task);
struct private_osx_write_data *powd = wn->private_data;
struct timeval tmp = {.tv_sec = 1, .tv_usec = 0}, delay = tmp;
- unsigned long divisor;
- size_t numbytes = powd->to->remaining * sizeof(short);
+ unsigned long factor;
+ size_t numbytes;
int ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
- if (ret < 0)
- sched_min_delay(s);
- if (ret <= 0 || numbytes < wn->min_iqs)
+ if (ret == 0)
return;
- divisor = powd->sample_rate * wn->min_iqs / numbytes;
- if (divisor)
- tv_divide(divisor, &tmp, &delay);
+ if (ret < 0 || !powd)
+ return sched_min_delay(s);
+ assert(powd->sample_rate > 0);
+ assert(wn->min_iqs > 0);
+ numbytes = powd->to->remaining * sizeof(short);
+ factor = numbytes / powd->sample_rate / wn->min_iqs;
+ tv_scale(factor, &tmp, &delay);
sched_request_timeout(&delay, s);
}
struct osx_write_args_info dummy;
osx_cmdline_parser_init(&dummy);
- w->open = osx_write_open;
w->close = osx_write_close;
w->pre_select = osx_write_pre_select;
w->post_select = osx_write_post_select;
- w->parse_config = osx_write_parse_config;
+ w->parse_config_or_die = osx_write_parse_config_or_die;
w->free_config = osx_free_config;
w->shutdown = NULL; /* nothing to do */
w->help = (struct ggo_help) {
_x > 0? _x : -_x; })
/**
- * define a standard log function that always writes to stderr
+ * Define a standard log function that always writes to stderr.
*
* \param loglevel_barrier If the loglevel of the current message
* is less than that, the message is going to be ignored.
"Written by Andre Noll.\n" \
"Report bugs to <maan@systemlinux.org>.\n"
-/** print out \p VERSION_TEXT and exit if version flag was given */
+/** Print out \p VERSION_TEXT and exit if version flag was given. */
#define HANDLE_VERSION_FLAG(_prefix, _args_info_struct) \
if (_args_info_struct.version_given) { \
printf("%s", VERSION_TEXT(_prefix)); \
exit(EXIT_SUCCESS); \
}
-/* Sent by para_client to initiate the authentication procedure. */
+/** Sent by para_client to initiate the authentication procedure. */
#define AUTH_REQUEST_MSG "auth rsa "
-/** sent by para_server for commands that expect a data file */
+/** Sent by para_server for commands that expect a data file. */
#define AWAITING_DATA_MSG "\nAwaiting Data."
-/** sent by para_server if authentication was successful */
+/** Sent by para_server if authentication was successful. */
#define PROCEED_MSG "Proceed."
-/** length of the \p PROCEED_MSG string */
+/** Length of the \p PROCEED_MSG string. */
#define PROCEED_MSG_LEN strlen(PROCEED_MSG)
-/** sent by para_client to indicate the end of the command line */
+/** Sent by para_client to indicate the end of the command line. */
#define EOC_MSG "\nEnd of Command."
/* exec */
return ((max + 0.0) * (random() / (RAND_MAX + 1.0)));
}
+/** Divide and round up to next integer. */
#define DIV_ROUND_UP(x, y) ({ \
typeof(y) _divisor = y; \
((x) + _divisor - 1) / _divisor; })
the tip of topic branches you are interested in from the output of
"git log next"). You should be able to safely build on top of them.
+However, at times "next" will be rebuilt from the tip of "master" to
+get rid of merge commits that will never be in "master. The commit
+that replaces "next" will usually have the identical tree, but it
+will have different ancestry from the tip of "master".
+
The "pu" (proposed updates) branch bundles the remainder of the
topic branches. The "pu" branch, and topic branches that are only in
"pu", are subject to rebasing in general. By the above definition
/**
* The command line parser of the writer.
*
- * It should check whether the command line options given by \a options are
- * valid. On success, it should return a pointer to the writer-specific
- * configuration data determined by \a options. Note that this might be called
- * more than once with different values of \a options. \sa \ref free_config().
+ * It should check whether the command line options given by \a options
+ * are valid and return a pointer to the writer-specific configuration
+ * data determined by \a options. This function must either succeed or
+ * call exit(). Note that parse_config_or_die() might be called more
+ * than once with different values of \a options. \sa \ref
+ * free_config().
*/
- void *(*parse_config)(const char *options);
+ void *(*parse_config_or_die)(const char *options);
/**
* Dellocate all configuration resources.
*
- * This should free whatever was allocated by \ref parse_config().
+ * This should free whatever was allocated by \ref parse_config_or_die().
*/
void (*free_config)(void *config);
- /**
- * Open one instance of this writer.
- *
- * This function should perform any work necessary to write the incoming
- * stream. To this aim, it may allocate its private data structure and store
- * a pointer to that structure via the given writer_node parameter.
- */
- int (*open)(struct writer_node *);
/**
* Prepare the fd sets for select.
*
c = wa[len];
if (c && c != ' ')
continue;
- if (c && !writers[i].parse_config)
- return NULL;
*writer_num = i;
- return writers[i].parse_config(c? wa + len + 1 : "");
+ return writers[i].parse_config_or_die(c? wa + len + 1 : "");
}
PARA_ERROR_LOG("writer not found\n");
return NULL;
}
+/**
+ * Open a writer node and register the corresponding task.
+ *
+ * \param wn The writer node to open.
+ * \param parent The parent btr node (the source for the writer node).
+ *
+ * The configuration of the writer node stored in \p wn->conf must be
+ * initialized before this function may be called.
+ */
void register_writer_node(struct writer_node *wn, struct btr_node *parent)
{
struct writer *w = writers + wn->writer_num;
char *name = make_message("%s writer", writer_names[wn->writer_num]);
- int ret;
wn->btrn = btr_new_node(&(struct btr_node_description)
EMBRACE(.name = name, .parent = parent,
.handler = w->execute, .context = wn));
strcpy(wn->task.status, name);
free(name);
- ret = w->open(wn);
wn->task.post_select = w->post_select;
wn->task.pre_select = w->pre_select;
register_task(&wn->task);
wn->conf = check_writer_arg(arg, &wn->writer_num);
else {
wn->writer_num = DEFAULT_WRITER;
- wn->conf = writers[DEFAULT_WRITER].parse_config("");
+ wn->conf = writers[DEFAULT_WRITER].parse_config_or_die("");
}
if (!wn->conf)
return -E_WRITE_COMMON_SYNTAX;
return 1;
}
-
/**
* Print the help text of all writers to stdout.
*
char *buf = NULL;
int ret = btr_exec_up(btrn, cmd, &buf);
- assert(ret >= 0);
+ if (ret < 0) {
+ /*
+ * This really should not happen. It means one of our parent
+ * nodes died unexpectedly. Proceed with fingers crossed.
+ */
+ PARA_CRIT_LOG("cmd %s: %s\n", cmd, para_strerror(-ret));
+ *result = 0;
+ return;
+ }
ret = para_atoi32(buf, result);
assert(ret >= 0);
free(buf);