X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=audiod.c;h=b880eb7f5d819ec60924587b32c72eee20b3bde9;hp=bde2b1e1299a51548b727c13c2d32df760067fda;hb=926c584394c29354752458b06f7226f0da0dd35e;hpb=335730538150250f32c0df0b184fb494e2bb0df3 diff --git a/audiod.c b/audiod.c index bde2b1e1..b880eb7f 100644 --- a/audiod.c +++ b/audiod.c @@ -7,6 +7,7 @@ /** \file audiod.c the paraslash's audio daemon */ #include #include +#include #include "para.h" #include "error.h" @@ -41,9 +42,9 @@ struct audio_format_info { void *receiver_conf; /** the number of filters that should be activated for this audio format */ unsigned int num_filters; - /** pointer to the array of filters to be activated */ - struct filter **filters; - /** pointer to the array of filter configurations */ + /** Array of filter numbers to be activated. */ + unsigned *filter_nums; + /** Pointer to the array of filter configurations. */ void **filter_conf; /** the number of filters that should be activated for this audio format */ unsigned int num_writers; @@ -212,20 +213,17 @@ static void kill_all_decoders(int error) FOR_EACH_SLOT(i) { struct slot_info *s = &slot[i]; - if (s->wng && !s->wng->task.error) { - PARA_INFO_LOG("unregistering writer node group in slot %d\n", + if (s->wng && s->wng->task.error >= 0) { + PARA_INFO_LOG("deactivating wng in slot %d\n", i); - wng_unregister(s->wng); s->wng->task.error = error; } - if (s->fc && !s->fc->task.error) { - PARA_INFO_LOG("unregistering filter chain in slot %d\n", i); - unregister_task(&s->fc->task); + if (s->fc && s->fc->task.error >= 0) { + PARA_INFO_LOG("deactivatimg filter chain in slot %d\n", i); s->fc->task.error = error; } - if (s->receiver_node && !s->receiver_node->task.error) { - PARA_INFO_LOG("unregistering receiver_node in slot %d\n", i); - unregister_task(&s->receiver_node->task); + if (s->receiver_node && s->receiver_node->task.error >= 0) { + PARA_INFO_LOG("deactivating receiver_node in slot %d\n", i); s->receiver_node->task.error = error; } } @@ -268,6 +266,7 @@ static void open_filters(int slot_num) { struct slot_info *s = &slot[slot_num]; struct audio_format_info *a = &afi[s->format]; + struct filter_node *fn; int nf = a->num_filters; int i; @@ -276,26 +275,27 @@ static void open_filters(int slot_num) return; PARA_INFO_LOG("opening %s filters\n", audio_formats[s->format]); s->fc = para_calloc(sizeof(struct filter_chain)); - INIT_LIST_HEAD(&s->fc->filters); + s->fc->filter_nodes = para_malloc(nf * sizeof(struct filter_chain)); s->fc->inbuf = s->receiver_node->buf; s->fc->in_loaded = &s->receiver_node->loaded; s->fc->input_error = &s->receiver_node->task.error; s->fc->task.pre_select = filter_pre_select; + s->fc->task.post_select = NULL; s->fc->task.error = 0; + s->fc->num_filters = nf; s->receiver_node->output_error = &s->fc->task.error; sprintf(s->fc->task.status, "filter chain"); - for (i = 0; i < nf; i++) { - struct filter_node *fn = para_calloc(sizeof(struct filter_node)); + FOR_EACH_FILTER_NODE(fn, s->fc, i) { + struct filter *f = filters + a->filter_nums[i]; + fn->filter_num = a->filter_nums[i]; fn->conf = a->filter_conf[i]; fn->fc = s->fc; - fn->filter = a->filters[i]; + fn->loaded = 0; INIT_LIST_HEAD(&fn->callbacks); - list_add_tail(&fn->node, &s->fc->filters); - fn->filter->open(fn); + f->open(fn); PARA_NOTICE_LOG("%s filter %d/%d (%s) started in slot %d\n", - audio_formats[s->format], i + 1, nf, - fn->filter->name, slot_num); + audio_formats[s->format], i, nf, f->name, slot_num); s->fc->outbuf = fn->buf; s->fc->out_loaded = &fn->loaded; } @@ -336,38 +336,16 @@ static void open_writers(int slot_num) return; } s->wstime = *now; - activate_inactive_grab_clients(slot_num, s->format, &s->fc->filters); + activate_inactive_grab_clients(slot_num, s->format, s->fc); } -#if 0 -static void rn_event_handler(struct task *t) -{ - struct receiver_node *rn = t->private_data; - int i; - - PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret)); - unregister_task(t); - rn->error = t->ret; - /* set restart barrier */ - FOR_EACH_SLOT(i) { - struct timeval restart_delay = {0, 10 * 1000}; - if (slot[i].receiver_node != rn) - continue; - if (rn->error != -E_RECV_EOF) - /* don't reconnect immediately on errors */ - restart_delay.tv_sec = 5; - tv_add(now, &restart_delay, &afi[slot[i].format].restart_barrier); - } -} -#endif - static int open_receiver(int format) { struct audio_format_info *a = &afi[format]; struct slot_info *s; int ret, slot_num; struct receiver_node *rn; - const struct timeval restart_delay = {1, 0}; + const struct timeval restart_delay = {2, 0}; ret = get_empty_slot(); if (ret < 0) @@ -391,9 +369,10 @@ static int open_receiver(int format) rn->task.post_select = a->receiver->post_select; sprintf(rn->task.status, "%s receiver node", rn->receiver->name); register_task(&rn->task); - return 1; + ret = 1; err: - PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + if (ret < 0) + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); tv_add(now, &restart_delay, &afi[format].restart_barrier); return ret; } @@ -526,8 +505,6 @@ static int check_stat_line(char *line, __a_unused void *data) struct timeval tv = {sec, usec}; compute_time_diff(&tv); } - if (stat_task->clock_diff_count) - stat_task->clock_diff_count--; break; case SI_FORMAT: stat_task->current_audio_format_num = get_audio_format_num( @@ -536,96 +513,6 @@ static int check_stat_line(char *line, __a_unused void *data) return 1; } -static void try_to_close_slot(int slot_num) -{ - struct slot_info *s = &slot[slot_num]; - - if (s->format < 0) - return; - if (s->receiver_node && s->receiver_node->task.error >= 0) - return; - if (s->fc && s->fc->task.error >= 0) - return; - if (s->wng && s->wng->task.error >= 0) - return; - PARA_INFO_LOG("closing slot %d\n", slot_num); - wng_close(s->wng); - close_filters(s->fc); - free(s->fc); - close_receiver(slot_num); - clear_slot(slot_num); -} - -/* - * Check if any receivers/filters/writers need to be started and do so if - * necessary. Since the pre_select function didn't have a chance yet to put - * file descriptors into the fd sets given by s, make the upcoming select() - * return immediately to avoid a long timeout in case we started something. - */ -static void audiod_pre_select(struct sched *s, __a_unused struct task *t) -{ - int i; - struct timeval min_delay = {0, 1}; - - if (audiod_status != AUDIOD_ON || !stat_task->playing) - return kill_all_decoders(-E_NOT_PLAYING); - if (open_current_receiver(s)) - s->timeout = min_delay; - FOR_EACH_SLOT(i) { - struct slot_info *sl = &slot[i]; - struct audio_format_info *a; - struct timeval diff; - - if (sl->format < 0) - continue; - a = &afi[sl->format]; - if (!sl->receiver_node) - continue; - if ((!a->num_filters || sl->fc) && sl->wng) - continue; /* everything already started */ - if (!a->num_filters) { - if (sl->receiver_node->loaded && !sl->wng) { - open_writers(i); - s->timeout = min_delay; - } - continue; - } - if (sl->receiver_node->loaded && !sl->fc) { - open_filters(i); - s->timeout = min_delay; - continue; - } - if (sl->wng || !sl->fc || !*sl->fc->out_loaded) - continue; - if (tv_diff(now, &initial_delay_barrier, &diff) > 0) { - open_writers(i); - s->timeout = min_delay; - continue; - } - PARA_INFO_LOG("initial delay: %lu ms left\n", tv2ms(&diff)); - if (tv_diff(&s->timeout, &diff, NULL) > 0) { - s->timeout = diff; - } - } -} - -static void audiod_post_select(__a_unused struct sched *s, - __a_unused struct task *t) -{ - int i; - - FOR_EACH_SLOT(i) - try_to_close_slot(i); -} - -static void init_audiod_task(struct task *t) -{ - t->pre_select = audiod_pre_select; - t->post_select = audiod_post_select; - t->error = 0; - sprintf(t->status, "audiod task"); -} - static int parse_stream_command(const char *txt, char **cmd) { char *p = strchr(txt, ':'); @@ -651,10 +538,10 @@ static int add_filter(int format, char *cmdline) filter_num = check_filter_arg(cmdline, &a->filter_conf[nf]); if (filter_num < 0) return filter_num; - a->filters[nf] = &filters[filter_num]; + a->filter_nums[nf] = filter_num; a->num_filters++; - PARA_INFO_LOG("%s filter %d: %s\n", audio_formats[format], nf + 1, - a->filters[nf]->name); + PARA_INFO_LOG("%s filter %d: %s\n", audio_formats[format], nf, + filters[filter_num].name); return filter_num; } @@ -786,7 +673,7 @@ static int init_filters(void) PARA_INFO_LOG("maximal number of filters: %d\n", nf); FOR_EACH_AUDIO_FORMAT(i) { afi[i].filter_conf = para_malloc(nf * sizeof(void *)); - afi[i].filters = para_malloc(nf * sizeof(struct filter *)); + afi[i].filter_nums = para_malloc(nf * sizeof(unsigned)); } if (!conf.no_default_filters_given) return init_default_filters(); @@ -942,10 +829,6 @@ static void close_stat_pipe(void) status_item_list[SI_BASENAME]); stat_client_write(stat_task->stat_item_values[SI_BASENAME], SI_BASENAME); - if (stat_task->clock_diff_count) { - stat_task->clock_diff_barrier.tv_sec = now->tv_sec + 1; - stat_task->clock_diff_barrier.tv_usec = now->tv_usec; - } } /** @@ -970,90 +853,147 @@ void __noreturn clean_exit(int status, const char *msg) } /* avoid busy loop if server is down */ -static void set_stat_task_restart_barrier(void) +static void set_stat_task_restart_barrier(unsigned seconds) { - struct timeval delay = {5, 0}; + struct timeval delay = {seconds, 0}; tv_add(now, &delay, &stat_task->restart_barrier); } -#if 0 -static void client_task_event_handler(__a_unused struct task *t) +static void try_to_close_slot(int slot_num) { - int i; + struct slot_info *s = &slot[slot_num]; - if (t->ret == -E_HANDSHAKE_COMPLETE) + if (s->format < 0) return; - unregister_task(t); - close_stat_pipe(); - if (t->ret != -E_SERVER_EOF) - stat_task->clock_diff_count = conf.clock_diff_count_arg; - set_stat_task_restart_barrier(); - FOR_EACH_AUDIO_FORMAT(i) - afi[i].restart_barrier = stat_task->restart_barrier; + if (s->receiver_node && s->receiver_node->task.error != -E_TASK_UNREGISTERED) + return; + if (s->fc && s->fc->task.error != -E_TASK_UNREGISTERED) + return; + if (s->wng && s->wng->task.error != -E_TASK_UNREGISTERED) + return; + PARA_INFO_LOG("closing slot %d\n", slot_num); + wng_close(s->wng); + close_filters(s->fc); + free(s->fc); + close_receiver(slot_num); + clear_slot(slot_num); } -#endif -static void status_pre_select(struct sched *s, struct task *t) +/* + * Check if any receivers/filters/writers need to be started and do so if + * necessary. + */ +static void start_stop_decoders(struct sched *s) { - struct status_task *st = container_of(t, struct status_task, task); - int ret; + int i; - if (st->ct || audiod_status == AUDIOD_OFF) - return; - if (!st->clock_diff_count && tv_diff(now, &st->restart_barrier, NULL) - < 0) - return; - if (st->clock_diff_count) { - char *argv[] = {"audiod", "stat", "1", NULL}; - int argc = 3; - if (tv_diff(now, &st->clock_diff_barrier, NULL) < 0) - return; - PARA_INFO_LOG("clock diff count: %d\n", st->clock_diff_count); - ret = client_open(argc, argv, &st->ct); + FOR_EACH_SLOT(i) + try_to_close_slot(i); + if (audiod_status != AUDIOD_ON || !stat_task->playing) + return kill_all_decoders(-E_NOT_PLAYING); + open_current_receiver(s); + FOR_EACH_SLOT(i) { + struct slot_info *sl = &slot[i]; + struct audio_format_info *a; + struct timeval diff; - } else { - char *argv[] = {"audiod", "stat", NULL}; - int argc = 2; - ret = client_open(argc, argv, &st->ct); + if (sl->format < 0) + continue; + a = &afi[sl->format]; + if (!sl->receiver_node) + continue; + if ((!a->num_filters || sl->fc) && sl->wng) + continue; /* everything already started */ + if (!a->num_filters) { + if (sl->receiver_node->loaded && !sl->wng) { + open_writers(i); + } + continue; + } + if (sl->receiver_node->loaded && !sl->fc) { + open_filters(i); + continue; + } + if (sl->wng || !sl->fc || !*sl->fc->out_loaded) + continue; + if (tv_diff(now, &initial_delay_barrier, &diff) > 0) { + open_writers(i); + continue; + } + PARA_INFO_LOG("initial delay: %lu ms left\n", tv2ms(&diff)); + if (tv_diff(&s->timeout, &diff, NULL) > 0) { + s->timeout = diff; + } } - set_stat_task_restart_barrier(); - if (ret < 0) - return; - s->timeout.tv_sec = 0; - s->timeout.tv_usec = 1; } -static void status_post_select(__a_unused struct sched *s, struct task *t) + +/* restart the client task if necessary */ +static void status_pre_select(struct sched *s, struct task *t) { struct status_task *st = container_of(t, struct status_task, task); - unsigned bytes_left; - if (!st->ct || st->ct->status != CL_RECEIVING) - return; - if (st->ct && audiod_status == AUDIOD_OFF) { - unregister_task(&st->ct->task); + if (audiod_status == AUDIOD_OFF) { + if (!st->ct) + goto out; + if (st->ct->task.error >= 0) { + st->ct->task.error = -E_AUDIOD_OFF; + goto out; + } + if (st->ct->task.error != -E_TASK_UNREGISTERED) + goto out; close_stat_pipe(); st->clock_diff_count = conf.clock_diff_count_arg; - return; + goto out; } - bytes_left = for_each_line(st->ct->buf, st->ct->loaded, - &check_stat_line, NULL); - if (st->ct->loaded != bytes_left) { - st->last_status_read = *now; - st->ct->loaded = bytes_left; - } else { - struct timeval diff; - tv_diff(now, &st->last_status_read, &diff); - if (diff.tv_sec > 61) + if (st->ct) { + unsigned bytes_left; + if (st->ct->task.error < 0) { + if (st->ct->task.error != -E_TASK_UNREGISTERED) + goto out; close_stat_pipe(); + goto out; + } + if (st->ct->status != CL_RECEIVING) + goto out; + bytes_left = for_each_line(st->ct->buf, st->ct->loaded, + &check_stat_line, NULL); + if (st->ct->loaded != bytes_left) { + st->last_status_read = *now; + st->ct->loaded = bytes_left; + } else { + struct timeval diff; + tv_diff(now, &st->last_status_read, &diff); + if (diff.tv_sec > 61) + st->ct->task.error = -E_STATUS_TIMEOUT; + } + goto out; } + if (tv_diff(now, &st->restart_barrier, NULL) < 0) + goto out; + if (st->clock_diff_count) { /* get status only one time */ + char *argv[] = {"audiod", "stat", "1", NULL}; + int argc = 3; + PARA_INFO_LOG("clock diff count: %d\n", st->clock_diff_count); + st->clock_diff_count--; + client_open(argc, argv, &st->ct); + set_stat_task_restart_barrier(2); + + } else { + char *argv[] = {"audiod", "stat", NULL}; + int argc = 2; + client_open(argc, argv, &st->ct); + set_stat_task_restart_barrier(5); + } + st->last_status_read = *now; +out: + start_stop_decoders(s); } static void init_status_task(struct status_task *st) { memset(st, 0, sizeof(struct status_task)); st->task.pre_select = status_pre_select; - st->task.post_select = status_post_select; st->sa_time_diff_sign = 1; st->clock_diff_count = conf.clock_diff_count_arg; st->current_audio_format_num = -1; @@ -1093,21 +1033,25 @@ int main(int argc, char *argv[]) int ret, i; struct sched s; struct command_task command_task_struct, *cmd_task = &command_task_struct; - struct task audiod_task_struct, *audiod_task = &audiod_task_struct; + struct audiod_cmdline_parser_params params = { + .override = 0, + .initialize = 1, + .check_required = 0, + .check_ambiguity = 0, + .print_errors = 1 + }; valid_fd_012(); - audiod_cmdline_parser(argc, argv, &conf); + audiod_cmdline_parser_ext(argc, argv, &conf, ¶ms); HANDLE_VERSION_FLAG("audiod", conf); para_drop_privileges(conf.user_arg, conf.group_arg); config_file = configfile_exists(); if (config_file) { - struct audiod_cmdline_parser_params params = { - .override = 0, - .initialize = 0, - .check_required = 0, - .check_ambiguity = 0 - - }; + params.override = 0; + params.initialize = 0; + params.check_required = 1; + params.check_ambiguity = 0; + params.print_errors = 1; if (audiod_cmdline_parser_config_file(config_file, &conf, ¶ms)) { PARA_EMERG_LOG("parse error in config file\n"); exit(EXIT_FAILURE); @@ -1132,7 +1076,6 @@ int main(int argc, char *argv[]) init_status_task(stat_task); init_command_task(cmd_task); - init_audiod_task(audiod_task); if (conf.daemon_given) daemon_init(); @@ -1140,7 +1083,6 @@ int main(int argc, char *argv[]) register_task(&sig_task->task); register_task(&cmd_task->task); register_task(&stat_task->task); - register_task(audiod_task); s.default_timeout.tv_sec = 0; s.default_timeout.tv_usec = 99 * 1000; ret = schedule(&s);