X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=afs.c;h=ccda7cdaaa4e211b696d4e898f08c7c2520ab3ef;hp=d7c1ecfded5adf25c0a9c81ed86667c614d4f66e;hb=0dd69d3988a677aeb8d0d3aea8364c664ac35fb9;hpb=7ba6e107eb38c5ceb6f66aac21ad6a5a3afa49ad diff --git a/afs.c b/afs.c index d7c1ecfd..ccda7cda 100644 --- a/afs.c +++ b/afs.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2007-2008 Andre Noll + * Copyright (C) 2007-2009 Andre Noll * * Licensed under the GPL v2. For licencing details see COPYING. */ @@ -54,13 +54,13 @@ enum afs_table_num { }; static struct afs_table afs_tables[NUM_AFS_TABLES] = { - [TBLNUM_AUDIO_FILES] = {.init = aft_init}, - [TBLNUM_ATTRIBUTES] = {.init = attribute_init}, - [TBLNUM_SCORES] = {.init = score_init}, - [TBLNUM_MOODS] = {.init = moods_init}, - [TBLNUM_LYRICS] = {.init = lyrics_init}, - [TBLNUM_IMAGES] = {.init = images_init}, - [TBLNUM_PLAYLIST] = {.init = playlists_init}, + [TBLNUM_AUDIO_FILES] = {.init = aft_init, .name = "audio_files"}, + [TBLNUM_ATTRIBUTES] = {.init = attribute_init, .name = "attributes"}, + [TBLNUM_SCORES] = {.init = score_init, .name = "scores"}, + [TBLNUM_MOODS] = {.init = moods_init, .name = "moods"}, + [TBLNUM_LYRICS] = {.init = lyrics_init, .name = "lyrics"}, + [TBLNUM_IMAGES] = {.init = images_init, .name = "images"}, + [TBLNUM_PLAYLIST] = {.init = playlists_init, .name = "playlists"}, }; struct command_task { @@ -75,6 +75,7 @@ struct command_task { struct task task; }; +extern int mmd_mutex; extern struct misc_meta_data *mmd; static int server_socket; @@ -140,6 +141,36 @@ struct callback_result { size_t result_size; }; +static int dispatch_result(int result_shmid, callback_result_handler *handler, + void *private_result_data) +{ + struct osl_object result; + void *result_shm; + int ret2, ret = shm_attach(result_shmid, ATTACH_RO, &result_shm); + struct callback_result *cr = result_shm; + + if (ret < 0) { + PARA_ERROR_LOG("attach failed: %s\n", para_strerror(-ret)); + return ret; + } + result.size = cr->result_size; + result.data = result_shm + sizeof(*cr); + if (result.size) { + assert(handler); + ret = handler(&result, private_result_data); + if (ret < 0) + PARA_NOTICE_LOG("result handler error: %s\n", + para_strerror(-ret)); + } + ret2 = shm_detach(result_shm); + if (ret2 < 0) { + PARA_ERROR_LOG("detach failed: %s\n", para_strerror(-ret2)); + if (ret >= 0) + ret = ret2; + } + return ret; +} + /** * Ask the afs process to call a given function. * @@ -160,8 +191,7 @@ struct callback_result { * shmid are passed to that function as an osl object. The private_result_data * pointer is passed as the second argument to \a result_handler. * - * \return Negative, on errors, the return value of the callback function - * otherwise. + * \return Standard. * * \sa send_option_arg_callback_request(), send_standard_callback_request(). */ @@ -170,10 +200,11 @@ int send_callback_request(callback_function *f, struct osl_object *query, void *private_result_data) { struct callback_query *cq; - int num_results = 0, ret, fd = -1, query_shmid, result_shmid; - void *query_shm, *result_shm; + int ret, fd = -1, query_shmid, result_shmid; + void *query_shm; char buf[sizeof(afs_socket_cookie) + sizeof(int)]; size_t query_shm_size = sizeof(*cq); + int dispatch_error = 0; if (query) query_shm_size += query->size; @@ -204,45 +235,37 @@ int send_callback_request(callback_function *f, struct osl_object *query, ret = send_bin_buffer(fd, buf, sizeof(buf)); if (ret < 0) goto out; + /* + * Read all shmids from afs. + * + * Even if the dispatcher returns an error we _must_ continue to read + * shmids from fd so that we can destroy all shared memory areas that + * have been created for us by the afs process. + */ for (;;) { ret = recv_bin_buffer(fd, buf, sizeof(int)); if (ret <= 0) goto out; - if (ret != sizeof(int)) { - ret = -E_AFS_SHORT_READ; - goto out; - } + assert(ret == sizeof(int)); ret = *(int *) buf; - if (ret <= 0) - goto out; + assert(ret > 0); result_shmid = ret; - ret = shm_attach(result_shmid, ATTACH_RO, &result_shm); - if (ret >= 0) { - struct callback_result *cr = result_shm; - struct osl_object result; - num_results++; - result.size = cr->result_size; - result.data = result_shm + sizeof(*cr); - if (result.size) { - assert(result_handler); - ret = result_handler(&result, private_result_data); - if (shm_detach(result_shm) < 0) - PARA_ERROR_LOG("can not detach result\n"); - } - } else - PARA_ERROR_LOG("attach result failed: %d\n", ret); - if (shm_destroy(result_shmid) < 0) - PARA_ERROR_LOG("destroy result failed\n"); + if (!dispatch_error) { + ret = dispatch_result(result_shmid, result_handler, + private_result_data); + if (ret < 0) + dispatch_error = 1; + } + ret = shm_destroy(result_shmid); if (ret < 0) - break; + PARA_CRIT_LOG("destroy result failed: %s\n", + para_strerror(-ret)); } out: if (shm_destroy(query_shmid) < 0) - PARA_ERROR_LOG("%s\n", "shm destroy error"); + PARA_CRIT_LOG("shm destroy error\n"); if (fd >= 0) close(fd); - if (ret >= 0) - ret = num_results; // PARA_DEBUG_LOG("callback_ret: %d\n", ret); return ret; } @@ -499,7 +522,7 @@ static int pass_afd(int fd, char *buf, size_t size) * * \sa open_and_update_audio_file(). */ -int open_next_audio_file(void) +static int open_next_audio_file(void) { struct osl_row *aft_row; struct audio_file_data afd; @@ -571,15 +594,15 @@ static int activate_mood_or_playlist(char *arg, int *num_admissible) free(current_mop); if (arg) { current_mop = para_strdup(arg); - mmd_lock(); + mutex_lock(mmd_mutex); strncpy(mmd->afs_mode_string, arg, sizeof(mmd->afs_mode_string)); mmd->afs_mode_string[sizeof(mmd->afs_mode_string) - 1] = '\0'; - mmd_unlock(); + mutex_unlock(mmd_mutex); } else { - mmd_lock(); + mutex_lock(mmd_mutex); strcpy(mmd->afs_mode_string, "dummy"); - mmd_unlock(); + mutex_unlock(mmd_mutex); current_mop = NULL; } } @@ -747,59 +770,50 @@ static int open_afs_tables(void) return ret; } -static void unregister_tasks(void) -{ - unregister_task(&command_task_struct.task); - unregister_task(&signal_task_struct.task); -} - static void signal_pre_select(struct sched *s, struct task *t) { - struct signal_task *st = t->private_data; - t->ret = 1; + struct signal_task *st = container_of(t, struct signal_task, task); para_fd_set(st->fd, &s->rfds, &s->max_fileno); } -static void signal_post_select(struct sched *s, struct task *t) +static void afs_signal_post_select(struct sched *s, struct task *t) { - struct signal_task *st = t->private_data; - t->ret = -E_AFS_PARENT_DIED; - if (getppid() == 1) - goto err; - t->ret = 1; + struct signal_task *st = container_of(t, struct signal_task, task); + if (getppid() == 1) { + PARA_EMERG_LOG("para_server died\n"); + goto shutdown; + } if (!FD_ISSET(st->fd, &s->rfds)) return; st->signum = para_next_signal(); - t->ret = 1; - if (st->signum == SIGUSR1) - return; /* ignore SIGUSR1 */ if (st->signum == SIGHUP) { close_afs_tables(); - t->ret = open_afs_tables(); - if (t->ret < 0) - goto err; + parse_config_or_die(1); + t->error = open_afs_tables(); + if (t->error < 0) + return; init_admissible_files(current_mop); return; } - t->ret = -E_AFS_SIGNAL; -err: - PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret)); - unregister_tasks(); + PARA_EMERG_LOG("terminating on signal %d\n", st->signum); +shutdown: + sched_shutdown(); + t->error = -E_AFS_SIGNAL; } static void register_signal_task(void) { struct signal_task *st = &signal_task_struct; + + para_sigaction(SIGPIPE, SIG_IGN); st->fd = para_signal_init(); PARA_INFO_LOG("signal pipe: fd %d\n", st->fd); para_install_sighandler(SIGINT); para_install_sighandler(SIGTERM); - para_install_sighandler(SIGPIPE); para_install_sighandler(SIGHUP); st->task.pre_select = signal_pre_select; - st->task.post_select = signal_post_select; - st->task.private_data = st; + st->task.post_select = afs_signal_post_select; sprintf(st->task.status, "signal task"); register_task(&st->task); } @@ -818,14 +832,13 @@ struct afs_client { static void command_pre_select(struct sched *s, struct task *t) { - struct command_task *ct = t->private_data; + struct command_task *ct = container_of(t, struct command_task, task); struct afs_client *client; para_fd_set(server_socket, &s->rfds, &s->max_fileno); para_fd_set(ct->fd, &s->rfds, &s->max_fileno); list_for_each_entry(client, &afs_client_list, node) para_fd_set(client->fd, &s->rfds, &s->max_fileno); - t->ret = 1; } /** @@ -895,31 +908,27 @@ static int call_callback(int fd, int query_shmid) query.data = (char *)query_shm + sizeof(*cq); query.size = cq->query_size; cq->handler(fd, &query); - return 1; + return shm_detach(query_shm); } -static void execute_server_command(void) +static int execute_server_command(void) { char buf[8]; int ret = recv_bin_buffer(server_socket, buf, sizeof(buf) - 1); if (ret <= 0) { - if (ret < 0) - PARA_ERROR_LOG("%s\n", para_strerror(-ret)); - return; + if (!ret) + ret = -ERRNO_TO_PARA_ERROR(ECONNRESET); + goto err; } buf[ret] = '\0'; PARA_DEBUG_LOG("received: %s\n", buf); - if (!strcmp(buf, "new")) { - ret = open_next_audio_file(); - if (ret < 0) { - PARA_EMERG_LOG("%s\n", para_strerror(-ret)); - unregister_tasks(); - } - return; - } - PARA_ERROR_LOG("unknown command\n"); - + ret = -E_BAD_CMD; + if (strcmp(buf, "new")) + goto err; + ret = open_next_audio_file(); +err: + return ret; } static void execute_afs_command(int fd, uint32_t expected_cookie) @@ -960,12 +969,19 @@ err: static void command_post_select(struct sched *s, struct task *t) { - struct command_task *ct = t->private_data; + struct command_task *ct = container_of(t, struct command_task, task); struct sockaddr_un unix_addr; struct afs_client *client, *tmp; - int fd; - if (FD_ISSET(server_socket, &s->rfds)) - execute_server_command(); + int fd, ret; + + if (FD_ISSET(server_socket, &s->rfds)) { + ret = execute_server_command(); + if (ret < 0) { + PARA_EMERG_LOG("%s\n", para_strerror(-ret)); + sched_shutdown(); + return; + } + } /* Check the list of connected clients. */ list_for_each_entry_safe(client, tmp, &afs_client_list, node) { @@ -984,25 +1000,23 @@ static void command_post_select(struct sched *s, struct task *t) } /* Accept connections on the local socket. */ if (!FD_ISSET(ct->fd, &s->rfds)) - goto out; - t->ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr)); - if (t->ret < 0) { - PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret)); - goto out; + return; + ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr)); + if (ret < 0) { + PARA_NOTICE_LOG("%s\n", para_strerror(-ret)); + return; } - fd = t->ret; - t->ret = mark_fd_nonblocking(fd); - if (t->ret < 0) { - PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret)); + fd = ret; + ret = mark_fd_nonblocking(fd); + if (ret < 0) { + PARA_NOTICE_LOG("%s\n", para_strerror(-ret)); close(fd); - goto out; + return; } client = para_malloc(sizeof(*client)); client->fd = fd; client->connect_time = *now; para_list_add(&client->node, &afs_client_list); -out: - t->ret = 1; } static void register_command_task(uint32_t cookie) @@ -1013,17 +1027,10 @@ static void register_command_task(uint32_t cookie) ct->task.pre_select = command_pre_select; ct->task.post_select = command_post_select; - ct->task.private_data = ct; sprintf(ct->task.status, "command task"); register_task(&ct->task); } -static void register_tasks(uint32_t cookie) -{ - register_signal_task(); - register_command_task(cookie); -} - /** * Initialize the audio file selector process. * @@ -1032,32 +1039,32 @@ static void register_tasks(uint32_t cookie) */ __noreturn void afs_init(uint32_t cookie, int socket_fd) { - struct sched s; + static struct sched s; int i, ret; + register_signal_task(); INIT_LIST_HEAD(&afs_client_list); for (i = 0; i < NUM_AFS_TABLES; i++) afs_tables[i].init(&afs_tables[i]); ret = open_afs_tables(); - - if (ret < 0) { - PARA_EMERG_LOG("%s\n", para_strerror(-ret)); - exit(EXIT_FAILURE); - } + if (ret < 0) + goto out; server_socket = socket_fd; ret = mark_fd_nonblocking(server_socket); if (ret < 0) - exit(EXIT_FAILURE); + goto out_close; PARA_INFO_LOG("server_socket: %d, afs_socket_cookie: %u\n", server_socket, (unsigned) cookie); init_admissible_files(conf.afs_initial_mode_arg); - register_tasks(cookie); + register_command_task(cookie); s.default_timeout.tv_sec = 0; s.default_timeout.tv_usec = 999 * 1000; ret = schedule(&s); +out_close: + close_afs_tables(); +out: if (ret < 0) PARA_EMERG_LOG("%s\n", para_strerror(-ret)); - close_afs_tables(); exit(EXIT_FAILURE); } @@ -1065,7 +1072,7 @@ static void create_tables_callback(int fd, const struct osl_object *query) { uint32_t table_mask = *(uint32_t *)query->data; int i, ret; - char *buf; + struct para_buffer pb = {.buf = NULL}; close_afs_tables(); for (i = 0; i < NUM_AFS_TABLES; i++) { @@ -1078,15 +1085,15 @@ static void create_tables_callback(int fd, const struct osl_object *query) ret = t->create(database_dir); if (ret < 0) goto out; + para_printf(&pb, "successfully created %s table\n", t->name); } ret = open_afs_tables(); out: - if (ret >= 0) - buf = make_message("successfully created afs table(s)\n"); - else - buf = make_message("%s\n", para_strerror(-ret)); - pass_buffer_as_shm(buf, strlen(buf), &fd); - free(buf); + if (ret < 0) + para_printf(&pb, "%s\n", para_strerror(-ret)); + if (pb.buf) + pass_buffer_as_shm(pb.buf, pb.offset, &fd); + free(pb.buf); } int com_init(int fd, int argc, char * const * const argv) @@ -1114,7 +1121,7 @@ int com_init(int fd, int argc, char * const * const argv) return -E_BAD_TABLE_NAME; } } - ret = send_callback_request(create_tables_callback, &query, NULL, NULL); + ret = send_callback_request(create_tables_callback, &query, &send_result, &fd); if (ret < 0) return send_va_buffer(fd, "%s\n", para_strerror(-ret)); return ret; @@ -1183,6 +1190,16 @@ int com_check(int fd, int argc, char * const * const argv) return 1; } +/** + * The afs event dispatcher. + * + * \param event Type of the event. + * \param pb May be \p NULL. + * \param data Type depends on \a event. + * + * This function calls the table handlers of all tables and passes \a pb and \a + * data verbatim. It's up to the handlers to interpret the \a data pointer. + */ void afs_event(enum afs_events event, struct para_buffer *pb, void *data) { @@ -1194,7 +1211,8 @@ void afs_event(enum afs_events event, struct para_buffer *pb, continue; ret = t->event_handler(event, pb, data); if (ret < 0) - PARA_CRIT_LOG("%s\n", para_strerror(-ret)); + PARA_CRIT_LOG("table %s, event %d: %s\n", t->name, + event, para_strerror(-ret)); } }