From: Andre Date: Thu, 25 May 2006 21:57:48 +0000 (+0200) Subject: convert para_audiod to the new scheduler. X-Git-Tag: v0.2.14~101^2~21 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=899a19c03fc05f6fd98687e3eaa86841a4633a10 convert para_audiod to the new scheduler. As expected, this was a bit harder. It works, but I'm sure there are plenty of bugs left. Fortnunately, this doesn't matter much right now, because the next step will be the integration of the writers into the para_audiod executable. This will lead to major code simplifications, so let's defer the bug-hunting until this integration is complete. --- diff --git a/audiod.c b/audiod.c index db4d478c..fcce8ae9 100644 --- a/audiod.c +++ b/audiod.c @@ -75,62 +75,36 @@ struct audio_format_info { * \sa receier_node, receiver, filter, filter_node, filter_chain_info */ struct slot_info { -/** number of the audio format in this slot */ + /** number of the audio format in this slot */ int format; -/** the file descriptor of the writer */ + /** the file descriptor of the writer */ int write_fd; -/** the process id of the writer */ + /** the process id of the writer */ pid_t wpid; -/** time of the last successful read from the receiver */ + /** time of the last successful read from the receiver */ struct timeval rtime; -/** time the last write to the write fd happend */ + /** time the last write to the write fd happend */ struct timeval wtime; -/** writer start time */ + /** writer start time */ struct timeval wstime; -/** did we include \a write_fd in the fdset */ + /** did we include \a write_fd in the fdset */ int wcheck; -/** set to one if we have sent the TERM signal to \a wpid */ + /** set to one if we have sent the TERM signal to \a wpid */ int wkilled; -/** the receiver info associated with this slot */ + /** the receiver info associated with this slot */ struct receiver_node *receiver_node; -/** the active filter chain */ - struct filter_chain_info *fci; + /** the active filter chain */ + struct filter_chain *fc; }; - static struct slot_info slot[MAX_STREAM_SLOTS]; -/** defines one command of para_audiod */ -struct audiod_command { -/** the name of the command */ -const char *name; -/** pointer to the function that handles the command */ -int (*handler)(int, int, char**); -int (*line_handler)(int, char*); -/** one-line description of the command */ -const char *description; -/** summary of the command line options */ -const char *synopsis; -/** the long help text */ -const char *help; -}; - extern const char *status_item_list[NUM_STAT_ITEMS]; -static int com_grab(int, char *); -static int com_cycle(int, int, char **); -static int com_help(int, int, char **); -static int com_off(int, int, char **); -static int com_on(int, int, char **); -static int com_sb(int, int, char **); -static int com_stat(int, int, char **); -static int com_term(int, int, char **); -static int stat_pipe = -1, signal_pipe; - static struct gengetopt_args_info conf; static struct timeval server_stream_start, sa_time_diff; static int playing, current_decoder = -1, audiod_status = AUDIOD_ON, offset_seconds, length_seconds, - sa_time_diff_sign = 1, audiod_socket = -1; + sa_time_diff_sign = 1; static char *af_status, /* the audio format announced in server status */ *socket_name, *hostname; static char *stat_item_values[NUM_STAT_ITEMS]; @@ -139,6 +113,56 @@ static const struct timeval restart_delay = {0, 300 * 1000}; static struct audio_format_info afi[NUM_AUDIO_FORMATS]; +static struct signal_task signal_task_struct, *sig_task = &signal_task_struct; + +struct command_task { + int fd; + struct task task; +}; +static struct command_task command_task_struct, *cmd_task = &command_task_struct; + +struct status_task { + int fd; + struct task task; + char buf[STRINGSIZE]; + unsigned loaded; +}; +static struct status_task status_task_struct, *stat_task = &status_task_struct; + +struct audiod_task { + struct task task; +}; +static struct audiod_task audiod_task_struct, *at = &audiod_task_struct; + +struct signal_task { + int fd; + int signum; + struct task task; +}; + + +/** defines one command of para_audiod */ +struct audiod_command { + /** the name of the command */ + const char *name; + /** pointer to the function that handles the command */ + int (*handler)(int, int, char**); + int (*line_handler)(int, char*); + /** one-line description of the command */ + const char *description; + /** summary of the command line options */ + const char *synopsis; + /** the long help text */ + const char *help; +}; +static int com_grab(int, char *); +static int com_cycle(int, int, char **); +static int com_help(int, int, char **); +static int com_off(int, int, char **); +static int com_on(int, int, char **); +static int com_sb(int, int, char **); +static int com_stat(int, int, char **); +static int com_term(int, int, char **); static struct audiod_command cmds[] = { { .name = "cycle", @@ -235,7 +259,7 @@ static struct audiod_command cmds[] = { }; /** iterate over all slots */ -#define FOR_EACH_SLOT(slot) for (slot = 0; slot < MAX_STREAM_SLOTS; slot++) +#define FOR_EACH_SLOT(_slot) for (_slot = 0; _slot < MAX_STREAM_SLOTS; _slot++) /** iterate over all supported audio formats */ #define FOR_EACH_AUDIO_FORMAT(af) for (af = 0; af < NUM_AUDIO_FORMATS; af++) /** iterate over the array of all audiod commands */ @@ -391,8 +415,8 @@ static char *configfile_exists(void) static void setup_signal_handling(void) { - signal_pipe = para_signal_init(); - PARA_INFO_LOG("signal pipe: fd %d\n", signal_pipe); + sig_task->fd = para_signal_init(); + PARA_INFO_LOG("signal pipe: fd %d\n", sig_task->fd); para_install_sighandler(SIGINT); para_install_sighandler(SIGTERM); para_install_sighandler(SIGCHLD); @@ -472,8 +496,10 @@ static void close_receiver(int slot_num) if (s->format < 0 || !s->receiver_node) return; a = &afi[s->format]; - PARA_NOTICE_LOG("closing %s recevier in slot %d\n", - audio_formats[s->format] , slot_num); + PARA_NOTICE_LOG("closing %s recevier in slot %d (eof = %d)\n", + audio_formats[s->format] , slot_num, s->receiver_node->eof); + if (!s->receiver_node->eof) + unregister_task(&s->receiver_node->task); a->receiver->close(s->receiver_node); free(s->receiver_node); s->receiver_node = NULL; @@ -536,7 +562,7 @@ static int get_empty_slot(void) continue; if (s->receiver_node) continue; - if (s->fci) + if (s->fc) continue; clear_slot(i); return i; @@ -563,12 +589,12 @@ static void close_stat_pipe(void) { int i; - if (stat_pipe < 0) + if (stat_task->fd < 0) return; PARA_NOTICE_LOG("%s", "closing status pipe\n"); - close(stat_pipe); - del_close_on_fork_list(stat_pipe); - stat_pipe = -1; + close(stat_task->fd); + del_close_on_fork_list(stat_task->fd); + stat_task->fd = -1; kill_all_decoders(); for (i = 0; i < NUM_STAT_ITEMS; i++) { free(stat_item_values[i]); @@ -590,7 +616,7 @@ static void __noreturn clean_exit(int status, const char *msg) kill_all_decoders(); if (socket_name) unlink(socket_name); - if (stat_pipe >= 0) + if (stat_task->fd >= 0) close_stat_pipe(); exit(status); } @@ -626,38 +652,53 @@ int num_filters(int audio_format_num) return afi[audio_format_num].num_filters; } +void filter_event_handler(struct task *t) +{ + PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); +} + static void open_filters(int slot_num) { struct slot_info *s = &slot[slot_num]; struct audio_format_info *a = &afi[s->format]; int nf = a->num_filters; int i; + static int output_eof; /* FIXME */ - s->fci = para_calloc(sizeof(struct filter_chain_info)); - INIT_LIST_HEAD(&s->fci->filters); + s->fc = para_calloc(sizeof(struct filter_chain)); + INIT_LIST_HEAD(&s->fc->filters); if (!nf) return; - s->fci->inbuf = s->receiver_node->buf; - s->fci->in_loaded = &s->receiver_node->loaded; - s->fci->outbuf = s->receiver_node->buf; - s->fci->out_loaded = &s->receiver_node->loaded; - s->fci->eof = &s->receiver_node->eof; + s->fc->inbuf = s->receiver_node->buf; + s->fc->in_loaded = &s->receiver_node->loaded; + s->fc->input_eof = &s->receiver_node->eof; + s->fc->output_eof = &output_eof; + output_eof = 0; + + s->fc->task.pre_select = filter_pre_select; + s->fc->task.event_handler = filter_event_handler; + s->fc->task.private_data = s->fc; + s->fc->task.flags = 0; + s->fc->eof = 0; + sprintf(s->fc->task.status, "filter chain"); for (i = 0; i < nf; i++) { struct filter_node *fn = para_calloc(sizeof(struct filter_node)); fn->conf = a->filter_conf[i]; - fn->fci = s->fci; + fn->fc = s->fc; fn->filter = a->filters[i]; INIT_LIST_HEAD(&fn->callbacks); - list_add_tail(&fn->node, &s->fci->filters); + list_add_tail(&fn->node, &s->fc->filters); fn->filter->open(fn); PARA_NOTICE_LOG("%s filter %d/%d (%s) started in slot %d\n", audio_formats[s->format], i + 1, nf, fn->filter->name, slot_num); - s->fci->outbuf = fn->buf; - s->fci->out_loaded = &fn->loaded; + s->fc->outbuf = fn->buf; + s->fc->out_loaded = &fn->loaded; } - PARA_DEBUG_LOG("output buffer for filter chain %p: %p\n", s->fci, - s->fci->outbuf); + register_task(&s->fc->task); +// PARA_DEBUG_LOG("output loaded for filter chain %p: %p\n", s->fc, +// s->fc->out_loaded); } static struct filter_node *find_filter_node(int slot_num, int format, int filternum) @@ -667,7 +708,7 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter FOR_EACH_SLOT(i) { struct slot_info *s = &slot[i]; - if (s->format < 0 || !s->fci) + if (s->format < 0 || !s->fc) continue; if (slot_num >= 0 && slot_num != i) continue; @@ -677,7 +718,7 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter continue; /* success */ j = 1; - list_for_each_entry(fn, &s->fci->filters, node) + list_for_each_entry(fn, &s->fc->filters, node) if (filternum <= 0 || j++ == filternum) break; return fn; @@ -710,14 +751,21 @@ static void start_stream_writer(int slot_num) mark_fd_nonblock(s->write_fd); gettimeofday(&s->wstime, NULL); current_decoder = slot_num; - activate_inactive_grab_clients(slot_num, s->format, &s->fci->filters); + activate_inactive_grab_clients(slot_num, s->format, &s->fc->filters); } +void rn_event_handler(struct task *t) +{ +// struct receiver_node *rn = t->private_data; + PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); +} static void open_receiver(int format) { struct audio_format_info *a = &afi[format]; struct slot_info *s; int ret, slot_num; + struct receiver_node *rn; slot_num = get_empty_slot(); if (slot_num < 0) @@ -727,7 +775,9 @@ static void open_receiver(int format) gettimeofday(&s->rtime, NULL); s->wtime = s->rtime; s->receiver_node = para_calloc(sizeof(struct receiver_node)); - s->receiver_node->conf = a->receiver_conf; + rn = s->receiver_node; + rn->receiver = a->receiver; + rn->conf = a->receiver_conf; ret = a->receiver->open(s->receiver_node); if (ret < 0) { PARA_ERROR_LOG("failed to open receiver (%s)\n", @@ -738,6 +788,14 @@ static void open_receiver(int format) } PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n", audio_formats[s->format], a->receiver->name, slot_num); + rn->task.private_data = s->receiver_node; + PARA_NOTICE_LOG("rn = %p\n", rn->task.private_data); + rn->task.pre_select = a->receiver->pre_select; + rn->task.post_select = a->receiver->post_select; + rn->task.event_handler = rn_event_handler; + rn->task.flags = 0; + sprintf(rn->task.status, "receiver node"); + register_task(&rn->task); } static int is_frozen(int format) @@ -880,18 +938,13 @@ static void check_timeouts(void) now.tv_sec > s->rtime.tv_sec + timeout) { PARA_INFO_LOG("%s input buffer (slot %d) not ready\n", audio_formats[s->format], slot_num); - if (s->fci) - s->fci->error = 42; - else - close_receiver(slot_num); + s->receiver_node->eof = 1; } /* check write time */ if (s->wpid > 0 && !s->wkilled && now.tv_sec > s->wtime.tv_sec + timeout) { PARA_INFO_LOG("%s output buffer (slot %d) not ready\n", audio_formats[s->format], slot_num); - if (s->fci) - s->fci->error = 42; kill_stream_writer(slot_num); } } @@ -907,8 +960,8 @@ static size_t get_loaded_bytes(int slot_num) goto out; if (afi[s->format].num_filters) { - if (s->fci) - loaded = *s->fci->out_loaded; + if (s->fc) + loaded = *s->fc->out_loaded; } else { if (rn) loaded = rn->loaded; @@ -917,6 +970,20 @@ out: return loaded; } +static void close_writer(int slot_num) +{ + struct slot_info *s = &slot[slot_num]; + if (s->write_fd > 0) { + PARA_INFO_LOG("slot %d: closing write fd %d\n", slot_num, + s->write_fd); + close(s->write_fd); + del_close_on_fork_list(s->write_fd); + s->write_fd = -1; + } + if (s->fc) + *s->fc->output_eof = 1; /* FIXME */ +} + static void close_decoder_if_idle(int slot_num) { @@ -925,59 +992,52 @@ static void close_decoder_if_idle(int slot_num) if (s->format < 0) return; - if (!s->fci) + if (!s->fc) return; if (!rn->eof && !s->fc->eof && s->wpid > 0) return; - if (!s->fci->eof && s->wpid > 0) { /* eof */ - if (filter_io(s->fci) > 0) - return; - if (get_loaded_bytes(slot_num)) - return; - } - if (s->write_fd > 0) { - PARA_INFO_LOG("slot %d: closing write fd %d\n", slot_num, - s->write_fd); - close(s->write_fd); - del_close_on_fork_list(s->write_fd); - s->write_fd = -1; - } + if (!s->fc->eof && s->wpid > 0 && get_loaded_bytes(slot_num)) + return; + close_writer(slot_num); if (s->wpid > 0) return; /* wait until writer dies before closing filters */ PARA_INFO_LOG("closing all filters in slot %d (filter_chain %p)\n", - slot_num, s->fci); - close_filters(s->fci); - free(s->fci); + slot_num, s->fc); + close_filters(s->fc); + free(s->fc); close_receiver(slot_num); clear_slot(slot_num); } -static void set_stream_fds(fd_set *wfds, int *max_fileno) +static void audiod_pre_select(struct sched *s, struct task *t) { int i; + if (audiod_status != AUDIOD_ON) + kill_all_decoders(); + else if (playing) + start_current_receiver(); check_timeouts(); FOR_EACH_SLOT(i) { - struct slot_info *s = &slot[i]; struct audio_format_info *a; struct receiver_node *rn; close_decoder_if_idle(i); - s->wcheck = 0; - if (s->format < 0) + slot[i].wcheck = 0; + if (slot[i].format < 0) continue; - a = &afi[s->format]; - rn = s->receiver_node; - if (rn && rn->loaded && !s->wpid) { + a = &afi[slot[i].format]; + rn = slot[i].receiver_node; + if (rn && rn->loaded && !slot[i].wpid) { PARA_INFO_LOG("no writer in slot %d\n", i); start_stream_writer(i); } - if (s->write_fd <= 0) + if (slot[i].write_fd <= 0) continue; if (!get_loaded_bytes(i)) continue; - para_fd_set(s->write_fd, wfds, max_fileno); - s->wcheck = 1; + para_fd_set(slot[i].write_fd, &s->wfds, &s->max_fileno); + slot[i].wcheck = 1; } } @@ -986,60 +1046,63 @@ static int write_audio_data(int slot_num) struct slot_info *s = &slot[slot_num]; struct audio_format_info *a = &afi[s->format]; struct receiver_node *rn = s->receiver_node; - int rv; + int ret; char **buf; size_t *len; if (a->num_filters) { - buf = &s->fci->outbuf; - len = s->fci->out_loaded; + buf = &s->fc->outbuf; + len = s->fc->out_loaded; } else { buf = &rn->buf; len = &rn->loaded; } PARA_DEBUG_LOG("writing %p (%zd bytes)\n", *buf, *len); - rv = write(s->write_fd, *buf, *len); - PARA_DEBUG_LOG("wrote %d/%zd\n", rv, *len); - if (rv < 0) { + ret = write(s->write_fd, *buf, *len); + PARA_DEBUG_LOG("wrote %d/%zd\n", ret, *len); + if (ret < 0) { PARA_WARNING_LOG("write error in slot %d (fd %d): %s\n", slot_num, s->write_fd, strerror(errno)); *len = 0; - s->fci->error = E_WRITE_AUDIO_DATA; - } else if (rv != *len) { + close_writer(slot_num); +// s->fc->error = E_WRITE_AUDIO_DATA; + } else if (ret != *len) { PARA_DEBUG_LOG("partial %s write (%i/%zd) for slot %d\n", - audio_formats[s->format], rv, *len, slot_num); - *len -= rv; - memmove(*buf, *buf + rv, *len); + audio_formats[s->format], ret, *len, slot_num); + *len -= ret; + memmove(*buf, *buf + ret, *len); } else *len = 0; - if (rv > 0) + if (ret > 0) gettimeofday(&s->wtime, NULL); - return rv; + return ret; } -static void slot_io(fd_set *wfds) +static void audiod_post_select(struct sched *s, struct task *t) { int ret, i; FOR_EACH_SLOT(i) { - struct slot_info *s = &slot[i]; - struct receiver_node *rn = s->receiver_node; + struct receiver_node *rn = slot[i].receiver_node; if (rn && rn->loaded) - gettimeofday(&s->rtime, NULL); - if (s->format >= 0 && s->write_fd > 0 && s->fci) { - ret = filter_io(s->fci); - if (ret < 0) - s->fci->error = -ret; -// PARA_DEBUG_LOG("slot %d, filter io %d bytes, check write: %d, loaded: %d/%d, eof: %d\n", -// i, ret, s->wcheck, rn->loaded, *s->fci->out_loaded, rn->eof); - } - if (s->write_fd <= 0 || !s->wcheck || !FD_ISSET(s->write_fd, wfds)) + slot[i].rtime = s->now; + if (slot[i].write_fd <= 0 || !slot[i].wcheck + || !FD_ISSET(slot[i].write_fd, &s->wfds)) continue; - write_audio_data(i); + ret = write_audio_data(i); } } +static void init_audiod_task(struct audiod_task *at) +{ + at->task.pre_select = audiod_pre_select; + at->task.post_select = audiod_post_select; + at->task.private_data = at; + at->task.flags = 0; + sprintf(at->task.status, "audiod task"); +} + static int parse_stream_command(const char *txt, char **cmd) { char *p = strchr(txt, ':'); @@ -1435,7 +1498,7 @@ static int handle_connect(void) char *cmd = NULL, *p, *buf = para_calloc(MAXLINE), **argv = NULL; struct sockaddr_un unix_addr; - ret = para_accept(audiod_socket, &unix_addr, sizeof(struct sockaddr_un)); + ret = para_accept(cmd_task->fd, &unix_addr, sizeof(struct sockaddr_un)); if (ret < 0) goto out; clifd = ret; @@ -1501,17 +1564,17 @@ static void audiod_get_socket(void) PARA_NOTICE_LOG("connecting to local socket %s\n", socket_name); if (conf.force_given) unlink(socket_name); - audiod_socket = create_pf_socket(socket_name, &unix_addr, + cmd_task->fd = create_pf_socket(socket_name, &unix_addr, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IWOTH); - if (audiod_socket < 0) { + if (cmd_task->fd < 0) { PARA_EMERG_LOG("%s", "can not connect to socket\n"); exit(EXIT_FAILURE); /* do not unlink socket */ } - if (listen(audiod_socket, 5) < 0) { + if (listen(cmd_task->fd , 5) < 0) { PARA_EMERG_LOG("%s", "can not listen on socket\n"); exit(EXIT_FAILURE); /* do not unlink socket */ } - add_close_on_fork_list(audiod_socket); + add_close_on_fork_list(cmd_task->fd); } static int open_stat_pipe(void) @@ -1528,48 +1591,121 @@ static int open_stat_pipe(void) return ret; } -static void audiod_pre_select(fd_set *rfds, fd_set *wfds, struct timeval *tv, - int *max_fileno) +void signal_event_handler(struct task *t) { - int i, ret; + struct signal_task *st = t->private_data; + handle_signal(st->signum); +} - FOR_EACH_SLOT(i) { - struct slot_info *s = &slot[i]; - struct audio_format_info *a; - struct receiver_node *rn = s->receiver_node; - if (s->format < 0 || !rn) - continue; - a = &afi[s->format]; - ret = a->receiver->pre_select(rn, rfds, wfds, tv); -// PARA_NOTICE_LOG("%s preselect: %d\n", a->receiver->name, ret); - *max_fileno = PARA_MAX(*max_fileno, ret); +void signal_pre_select(struct sched *s, struct task *t) +{ + struct signal_task *st = t->private_data; + t->ret = 1; + para_fd_set(st->fd, &s->rfds, &s->max_fileno); +} + +void signal_post_select(struct sched *s, struct task *t) +{ + struct signal_task *st = t->private_data; + t->ret = 1; + if (!FD_ISSET(st->fd, &s->rfds)) + return; + t->ret = -E_SIGNAL_CAUGHT; + st->signum = para_next_signal(); +} + +void signal_setup_default(struct signal_task *st) +{ + st->task.pre_select = signal_pre_select; + st->task.post_select = signal_post_select; + st->task.private_data = st; + st->task.flags = 0; + sprintf(st->task.status, "signal task"); +} + +static void command_pre_select(struct sched *s, struct task *t) +{ + struct command_task *ct = t->private_data; + para_fd_set(ct->fd, &s->rfds, &s->max_fileno); + +} + +static void command_post_select(struct sched *s, struct task *t) +{ + int ret; + struct command_task *ct = t->private_data; + + if (audiod_status != AUDIOD_OFF) + audiod_status_dump(); + t->ret = 1; /* always successful */ + if (!FD_ISSET(ct->fd, &s->rfds)) + return; + ret = handle_connect(); + if (ret < 0) + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); +} + +void init_command_task(struct command_task *ct) +{ + ct->task.pre_select = command_pre_select; + ct->task.post_select = command_post_select; + ct->task.private_data = ct; + ct->task.flags = 0; + sprintf(ct->task.status, "command task"); +} + +static void status_pre_select(struct sched *s, struct task *t) +{ + struct status_task *st = t->private_data; + t->ret = 1; + if (st->fd >= 0 && audiod_status == AUDIOD_OFF) + close_stat_pipe(); + if (st->fd < 0 && audiod_status != AUDIOD_OFF) { + st->fd = open_stat_pipe(); + st->loaded = 0; + st->buf[0] = '\0'; } + if (st->fd >= 0 && audiod_status != AUDIOD_OFF) + para_fd_set(st->fd, &s->rfds, &s->max_fileno); } -static void audiod_post_select(int select_ret, fd_set *rfds, fd_set *wfds) + +static void status_post_select(struct sched *s, struct task *t) { - int i, ret; + struct status_task *st = t->private_data; + int ret; - FOR_EACH_SLOT(i) { - struct slot_info *s = &slot[i]; - struct audio_format_info *a; - struct receiver_node *rn = s->receiver_node; - if (s->format < 0 || !rn || rn->eof) - continue; - a = &afi[s->format]; - ret = a->receiver->post_select(rn, select_ret, rfds, wfds); - if (ret <= 0) { - if (ret) - PARA_ERROR_LOG("%s post select failed: %s (slot %d)\n", - a->receiver->name, PARA_STRERROR(-ret), i); - else - PARA_INFO_LOG("eof in slot %d\n", i); - rn->eof = 1; - } - if (ret < 0 && s->fci) - s->fci->error = ret; + t->ret = 1; + if (st->fd < 0 || !FD_ISSET(st->fd, &s->rfds)) + return; + ret = read(st->fd, st->buf + st->loaded, + STRINGSIZE - 1 - st->loaded); + if (ret <= 0) { + close_stat_pipe(); + /* avoid busy loop if server is down */ + while (sleep(1) > 0) + ; /* try again*/ + } else { + st->buf[ret + st->loaded] = '\0'; + st->loaded = for_each_line(st->buf, ret + st->loaded, + &check_stat_line); } } +static void init_status_task(struct status_task *st) +{ + st->task.pre_select = status_pre_select; + st->task.post_select = status_post_select; + st->task.private_data = st; + st->task.flags = 0; + st->loaded = 0; + st->fd = -1; + st->buf[0] = '\0'; + sprintf(st->task.status, "status task"); +} + + + +#if 0 /* TODO: move everything before the select call to pre_select() */ static void __noreturn audiod_mainloop(void) { @@ -1577,6 +1713,9 @@ static void __noreturn audiod_mainloop(void) int ret, max_fileno, sbo = 0; char status_buf[STRINGSIZE] = ""; struct timeval tv; + + + repeat: FD_ZERO(&wfds); FD_ZERO(&rfds); @@ -1640,6 +1779,7 @@ repeat: } goto repeat; } +#endif static void set_initial_status(void) { @@ -1658,10 +1798,13 @@ static void set_initial_status(void) PARA_WARNING_LOG("%s", "invalid mode\n"); } -int __noreturn main(int argc, char *argv[]) +int main(int argc, char *argv[]) { char *cf; - int i; + int ret, i; + struct sched s; + + init_sched(); valid_fd_012(); hostname = para_hostname(); @@ -1691,5 +1834,22 @@ int __noreturn main(int argc, char *argv[]) if (conf.daemon_given) daemon_init(); audiod_get_socket(); /* doesn't return on errors */ - audiod_mainloop(); + + signal_setup_default(sig_task); + sig_task->task.event_handler = signal_event_handler; + + init_status_task(stat_task); + init_command_task(cmd_task); + init_audiod_task(at); + + register_task(&sig_task->task); + register_task(&cmd_task->task); + register_task(&stat_task->task); + register_task(&at->task); + s.default_timeout.tv_sec = 0; + s.default_timeout.tv_usec = 999 * 1000; + ret = sched(&s); + + PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret)); + return EXIT_FAILURE; } diff --git a/configure.ac b/configure.ac index 5b3e2b06..5844b7a7 100644 --- a/configure.ac +++ b/configure.ac @@ -67,7 +67,7 @@ filter_ldflags="" audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline http_recv.cmdline dccp_recv.cmdline" audiod_errlist_objs="audiod exec close_on_fork signal string daemon stat net - time grab_client filter_chain wav compress http_recv dccp dccp_recv recv_common fd" + time grab_client filter_chain wav compress http_recv dccp dccp_recv recv_common fd sched" audiod_ldflags="" server_cmdline_objs="server.cmdline" diff --git a/error.h b/error.h index c4d9e5fa..8458ae2b 100644 --- a/error.h +++ b/error.h @@ -196,6 +196,7 @@ extern const char **para_errlist[]; PARA_ERROR(SIGNAL_READ, "read error from signal pipe"), \ PARA_ERROR(WAITPID, "waitpid error"), \ PARA_ERROR(SIGNAL_PIPE, "failed to setup signal pipe"), \ + PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \ #define STRING_ERRORS \ diff --git a/filter.c b/filter.c index cfa2e94f..ee7e6dbc 100644 --- a/filter.c +++ b/filter.c @@ -169,8 +169,8 @@ int main(int argc, char *argv[]) ret = sched(&s); out: free(sit->buf); + close_filters(fc); if (ret < 0) PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret)); - close_filters(fc); return ret < 0? EXIT_FAILURE : EXIT_SUCCESS; } diff --git a/grab_client.c b/grab_client.c index 4f90a27a..f505fa87 100644 --- a/grab_client.c +++ b/grab_client.c @@ -27,6 +27,7 @@ #include "close_on_fork.h" #include "grab_client.cmdline.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "grab_client.h" #include "audiod.h" diff --git a/http_recv.c b/http_recv.c index b4d38602..b566acf3 100644 --- a/http_recv.c +++ b/http_recv.c @@ -179,6 +179,7 @@ static int http_recv_open(struct receiver_node *rn) rn->buf = para_calloc(BUFSIZE); rn->private_data = para_calloc(sizeof(struct private_http_recv_data)); phd = rn->private_data; + PARA_NOTICE_LOG("phd = %p, rn = %p\n", phd, rn); ret = -E_HOST_INFO; if (!(he = get_host_info(conf->host_arg))) goto err_out; diff --git a/wav.c b/wav.c index 0b249715..5ab3bb39 100644 --- a/wav.c +++ b/wav.c @@ -96,7 +96,7 @@ static void wav_open(struct filter_node *fn) fn->private_data = para_malloc(sizeof(int)); bof = fn->private_data; *bof = 1; - PARA_DEBUG_LOG("wav filter node: %p, output buffer: %p, loaded: %zd\n", + PARA_INFO_LOG("wav filter node: %p, output buffer: %p, loaded: %zd\n", fn, fn->buf, fn->loaded); }