X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=audiod.c;h=3b6111964a8012de6bbdddd4b7ec340a1173ce48;hp=032f9cd0743a85a492b5cb446ebc965dc3b2cb1f;hb=b8bc24e242b088195249574bb90cda2e1ee1d9e4;hpb=30a64ad83ffc0e815cb135b0587ebf5a1f9af081 diff --git a/audiod.c b/audiod.c index 032f9cd0..3b611196 100644 --- a/audiod.c +++ b/audiod.c @@ -22,14 +22,13 @@ #include "audiod.cmdline.h" #include "list.h" -#include "close_on_fork.h" #include "sched.h" #include "recv.h" #include "filter.h" #include "grab_client.cmdline.h" #include "grab_client.h" - -#include "error.h" +#include "client.cmdline.h" +#include "client.h" #include "audiod.h" #include "net.h" #include "daemon.h" @@ -37,6 +36,7 @@ #include "fd.h" #include "write.h" #include "write_common.h" +#include "error.h" /** define the array of error lists needed by para_audiod */ INIT_AUDIOD_ERRLISTS; @@ -70,7 +70,7 @@ struct slot_info slot[MAX_STREAM_SLOTS]; int audiod_status = AUDIOD_ON; -struct gengetopt_args_info conf; +struct audiod_args_info conf; static char *socket_name; static FILE *logfile; static struct audio_format_info afi[NUM_AUDIO_FORMATS]; @@ -79,6 +79,7 @@ static struct signal_task signal_task_struct, *sig_task = &signal_task_struct; static struct status_task status_task_struct; struct status_task *stat_task = &status_task_struct; +static struct timeval initial_delay_barrier; /** * the task for handling audiod commands @@ -92,9 +93,15 @@ struct command_task { struct task task; }; +/** + * task for signal handling + */ struct signal_task { + /** the signal pipe */ int fd; + /** the number of the most recent signal */ int signum; + /** the associated task structure */ struct task task; }; @@ -163,10 +170,8 @@ static void setup_signal_handling(void) PARA_INFO_LOG("signal pipe: fd %d\n", sig_task->fd); para_install_sighandler(SIGINT); para_install_sighandler(SIGTERM); - para_install_sighandler(SIGCHLD); para_install_sighandler(SIGHUP); - para_install_sighandler(SIGPIPE); -// signal(SIGPIPE, SIG_IGN); + signal(SIGPIPE, SIG_IGN); } static void clear_slot(int slot_num) @@ -182,20 +187,15 @@ static void close_receiver(int slot_num) { struct slot_info *s = &slot[slot_num]; struct audio_format_info *a; - const struct timeval restart_delay = {0, 200 * 1000}; if (s->format < 0 || !s->receiver_node) return; a = &afi[s->format]; PARA_NOTICE_LOG("closing %s receiver in slot %d (eof = %d)\n", audio_formats[s->format] , slot_num, s->receiver_node->eof); -// if (!s->receiver_node->eof) -// unregister_task(&s->receiver_node->task); a->receiver->close(s->receiver_node); free(s->receiver_node); s->receiver_node = NULL; - /* set restart barrier */ - tv_add(now, &restart_delay, &afi[s->format].restart_barrier); } static void kill_all_decoders(void) @@ -242,32 +242,14 @@ static int get_empty_slot(void) return -E_NO_MORE_SLOTS; } -static int decoder_running(int format) -{ - int i, ret = 0; - struct slot_info *s; - - FOR_EACH_SLOT(i) { - s = &slot[i]; - if (s->format == format && s->receiver_node) - ret |= 1; - if (s->format == format && s->wng) - ret |= 2; - } - return ret; -} - static void close_stat_pipe(void) { int i; - if (stat_task->fd < 0) + if (!stat_task->pcd) return; - PARA_NOTICE_LOG("%s", "closing status pipe\n"); - close(stat_task->fd); - del_close_on_fork_list(stat_task->fd); - stat_task->fd = -1; -// kill_all_decoders(); + client_close(stat_task->pcd); + stat_task->pcd = NULL; for (i = 0; i < NUM_STAT_ITEMS; i++) { free(stat_task->stat_item_values[i]); stat_task->stat_item_values[i] = NULL; @@ -288,8 +270,7 @@ void __noreturn clean_exit(int status, const char *msg) PARA_EMERG_LOG("%s\n", msg); if (socket_name) unlink(socket_name); - if (stat_task->fd >= 0) - close_stat_pipe(); + close_stat_pipe(); exit(status); } @@ -405,10 +386,18 @@ static void open_writers(int slot_num) static void rn_event_handler(struct task *t) { struct receiver_node *rn = t->private_data; + const struct timeval restart_delay = {0, 10 * 1000}; + int i; PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); unregister_task(t); rn->eof = 1; + /* set restart barrier */ + FOR_EACH_SLOT(i) { + if (slot[i].receiver_node != rn) + continue; + tv_add(now, &restart_delay, &afi[slot[i].format].restart_barrier); + } } static void open_receiver(int format) @@ -445,17 +434,30 @@ static void open_receiver(int format) register_task(&rn->task); } +static int receiver_running(int format) +{ + int i; + + FOR_EACH_SLOT(i) { + struct slot_info *s = &slot[i]; + if (s->format == format && s->receiver_node + && !s->receiver_node->eof) + return 1; + } + return 0; +} + static int open_current_receiver(struct sched *s) { int i; struct timeval diff; - if (!stat_task->af_status) + if (!stat_task->af_status || !stat_task->pcd) return 0; i = get_audio_format_num(stat_task->af_status); if (i < 0) return 0; - if (decoder_running(i)) + if (receiver_running(i)) return 0; if (tv_diff(now, &afi[i].restart_barrier, &diff) < 0) { s->timeout = diff; @@ -539,8 +541,18 @@ static void check_stat_line(char *line) break; case SI_STREAM_START: if (sscanf(line + ilen + 1, "%lu.%lu", &sec, &usec) == 2) { + struct timeval a_start, delay; + delay.tv_sec = conf.stream_delay_arg / 1000; + delay.tv_usec = (conf.stream_delay_arg % 1000) * 1000; stat_task->server_stream_start.tv_sec = sec; stat_task->server_stream_start.tv_usec = usec; + if (stat_task->sa_time_diff_sign < 0) + tv_add(&stat_task->server_stream_start, + &stat_task->sa_time_diff, &a_start); + else + tv_diff(&stat_task->server_stream_start, + &stat_task->sa_time_diff, &a_start); + tv_add(&a_start, &delay, &initial_delay_barrier); } break; case SI_CURRENT_TIME: @@ -555,13 +567,6 @@ static void check_stat_line(char *line) static void handle_signal(int sig) { switch (sig) { - case SIGCHLD: - for (;;) { - pid_t pid = para_reap_child(); - if (pid <= 0) - return; - PARA_CRIT_LOG("para_client died (pid %d)\n", pid); - } case SIGINT: case SIGTERM: case SIGHUP: @@ -610,6 +615,7 @@ static void audiod_pre_select(struct sched *s, __a_unused struct task *t) FOR_EACH_SLOT(i) { struct slot_info *sl = &slot[i]; struct audio_format_info *a; + struct timeval diff; if (sl->format < 0) continue; @@ -628,9 +634,20 @@ static void audiod_pre_select(struct sched *s, __a_unused struct task *t) s->timeout = min_delay; continue; } - if (sl->fc && *sl->fc->out_loaded && !sl->wng) { + if (!sl->fc || !*sl->fc->out_loaded || sl->wng) + continue; + if (tv_diff(now, &initial_delay_barrier, &diff) > 0) { + PARA_INFO_LOG("barrier: %lu:%lu, now: %lu, %lu\n", + initial_delay_barrier.tv_sec, + initial_delay_barrier.tv_usec, + now->tv_sec, now->tv_usec); open_writers(i); s->timeout = min_delay; + continue; + } + PARA_INFO_LOG("inital delay: %lu ms left\n", tv2ms(&diff)); + if (tv_diff(&s->timeout, &diff, NULL) > 0) { + s->timeout = diff; } } } @@ -736,18 +753,19 @@ static int init_receivers(void) PARA_INFO_LOG("initializing %s receiver\n", receivers[i].name); receivers[i].init(&receivers[i]); } - for (i = 0; i < conf.receiver_given; i++) { + for (i = conf.receiver_given - 1; i >= 0; i--) { char *arg = conf.receiver_arg[i]; - char *recv = strchr(arg, ':'); + char *recv_arg = strchr(arg, ':'); + PARA_INFO_LOG("arg: %s\n", arg); ret = -E_MISSING_COLON; - if (!recv) + if (!recv_arg) goto out; - *recv = '\0'; - recv++; + *recv_arg = '\0'; + recv_arg++; ret = get_audio_format_num(arg); if (ret < 0) goto out; - afi[ret].receiver_conf = check_receiver_arg(recv, &receiver_num); + afi[ret].receiver_conf = check_receiver_arg(recv_arg, &receiver_num); if (!afi[ret].receiver_conf) { ret = -E_RECV_SYNTAX; goto out; @@ -881,25 +899,11 @@ static int audiod_get_socket(void) PARA_EMERG_LOG("%s", "can not listen on socket\n"); exit(EXIT_FAILURE); /* do not unlink socket */ } - add_close_on_fork_list(fd); + mark_fd_nonblock(fd); return fd; } -static int open_stat_pipe(void) -{ - int ret, fd[3] = {-1, 1, 0}; - pid_t pid; - ret = para_exec_cmdline_pid(&pid, BINDIR "/para_client stat", fd); - if (ret >= 0) { - ret = fd[1]; - PARA_NOTICE_LOG("stat pipe opened, fd %d\n", ret); - add_close_on_fork_list(ret); - } else - clean_exit(EXIT_FAILURE, "failed to open status pipe"); - return ret; -} - -void signal_event_handler(struct task *t) +static void signal_event_handler(struct task *t) { struct signal_task *st = t->private_data; @@ -909,14 +913,14 @@ void signal_event_handler(struct task *t) handle_signal(st->signum); } -void signal_pre_select(struct sched *s, struct task *t) +static void signal_pre_select(struct sched *s, struct task *t) { struct signal_task *st = t->private_data; t->ret = 1; para_fd_set(st->fd, &s->rfds, &s->max_fileno); } -void signal_post_select(struct sched *s, struct task *t) +static void signal_post_select(struct sched *s, struct task *t) { struct signal_task *st = t->private_data; t->ret = 1; @@ -926,7 +930,7 @@ void signal_post_select(struct sched *s, struct task *t) st->signum = para_next_signal(); } -void signal_setup_default(struct signal_task *st) +static void signal_setup_default(struct signal_task *st) { st->task.pre_select = signal_pre_select; st->task.post_select = signal_post_select; @@ -967,6 +971,15 @@ static void init_command_task(struct command_task *ct) sprintf(ct->task.status, "command task"); } +static void client_task_event_handler(__a_unused struct task *t) +{ + struct private_client_data *pcd = t->private_data; + if (t->ret == -E_HANDSHAKE_COMPLETE) + return; + unregister_task(t); + pcd->eof = 1; +} + static void status_event_handler(__a_unused struct task *t) { struct timeval delay = {1, 0}; @@ -984,34 +997,34 @@ static void status_event_handler(__a_unused struct task *t) static void status_pre_select(struct sched *s, struct task *t) { struct status_task *st = t->private_data; + int argc = 2; + char *argv[] = {"audiod", "stat", NULL}; t->ret = 1; - if (st->fd >= 0 && audiod_status == AUDIOD_OFF) + if (st->pcd && (audiod_status == AUDIOD_OFF || st->pcd->eof)) close_stat_pipe(); - if (st->fd < 0 && audiod_status != AUDIOD_OFF + if (!st->pcd && audiod_status != AUDIOD_OFF && tv_diff(now, &st->restart_barrier, NULL) > 0) { - st->fd = open_stat_pipe(); - st->loaded = 0; - st->buf[0] = '\0'; + t->ret = client_parse_config(argc, argv, &st->pcd); + if (t->ret < 0) + return; + t->ret = client_open(st->pcd); + if (t->ret < 0) + return; + st->pcd->task.event_handler = client_task_event_handler; + s->timeout.tv_sec = 0; + s->timeout.tv_usec = 1; } - if (st->fd >= 0 && audiod_status != AUDIOD_OFF) - para_fd_set(st->fd, &s->rfds, &s->max_fileno); } -static void status_post_select(struct sched *s, struct task *t) +static void status_post_select(__a_unused struct sched *s, struct task *t) { struct status_task *st = t->private_data; t->ret = 1; - if (st->fd < 0 || !FD_ISSET(st->fd, &s->rfds)) - return; - t->ret = read(st->fd, st->buf + st->loaded, STRINGSIZE - 1 - st->loaded); - if (t->ret <= 0) { - if (!t->ret) - t->ret = -E_STATUS_EOF; + if (!st->pcd || !st->pcd->loaded + || st->pcd->status != CL_RECEIVING) return; - } - st->buf[t->ret + st->loaded] = '\0'; - st->loaded = for_each_line(st->buf, t->ret + st->loaded, + st->pcd->loaded = for_each_line(st->pcd->buf, st->pcd->loaded, &check_stat_line); } @@ -1022,7 +1035,6 @@ static void init_status_task(struct status_task *st) st->task.post_select = status_post_select; st->task.event_handler = status_event_handler; st->task.private_data = st; - st->fd = -1; st->sa_time_diff_sign = 1; sprintf(st->task.status, "status task"); } @@ -1052,14 +1064,12 @@ int main(int argc, char *argv[]) struct command_task command_task_struct, *cmd_task = &command_task_struct; struct task audiod_task_struct, *audiod_task = &audiod_task_struct; - init_sched(); - valid_fd_012(); - cmdline_parser(argc, argv, &conf); + audiod_cmdline_parser(argc, argv, &conf); para_drop_privileges(conf.user_arg, conf.group_arg); cf = configfile_exists(); if (cf) { - if (cmdline_parser_configfile(cf, &conf, 0, 0, 0)) { + if (audiod_cmdline_parser_configfile(cf, &conf, 0, 0, 0)) { PARA_EMERG_LOG("%s", "parse error in config file\n"); exit(EXIT_FAILURE); } @@ -1092,7 +1102,7 @@ int main(int argc, char *argv[]) register_task(&cmd_task->task); register_task(&stat_task->task); register_task(audiod_task); - s.default_timeout.tv_sec = 3; + s.default_timeout.tv_sec = 0; s.default_timeout.tv_usec = 99 * 1000; ret = sched(&s);