X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=afh_recv.c;h=e320fdee65627665d1247da98ddc68ff0786b364;hp=550ca424376ad2108557ef17159265e4667ad3eb;hb=69a294cd641c623db61f46ee86901845789a1c7b;hpb=618a25011420f434f05305a4053a49824d39b4a2 diff --git a/afh_recv.c b/afh_recv.c index 550ca424..e320fdee 100644 --- a/afh_recv.c +++ b/afh_recv.c @@ -1,3 +1,11 @@ +/* + * Copyright (C) 2011-2014 Andre Noll + * + * Licensed under the GPL v2. For licencing details see COPYING. + */ + +/** \file afh_recv.c Receiver for streaming local files. */ + #include #include #include @@ -12,6 +20,218 @@ #include "afh_recv.cmdline.h" #include "string.h" #include "fd.h" +#include "afh.h" + +struct private_afh_recv_data { + int fd; + void *map; + size_t map_size; + struct afh_info afhi; + int audio_format_num; + long unsigned first_chunk; + long unsigned last_chunk; + struct timeval stream_start; + uint32_t current_chunk; +}; + +static int afh_execute(struct btr_node *btrn, const char *cmd, char **result) +{ + struct receiver_node *rn = btr_context(btrn); + struct private_afh_recv_data *pard = rn->private_data; + + *result = NULL; + if (!strcmp(cmd, "seconds_total")) { + *result = make_message("%lu", pard->afhi.seconds_total); + return 1; + } + if (!strcmp(cmd, "chunks_total")) { + *result = make_message("%lu", pard->afhi.chunks_total); + return 1; + } + if (!strcmp(cmd, "afhi")) { + afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result); + return 1; + } + if (!strncmp(cmd, "repos", 5)) { + int32_t x; + int ret = para_atoi32(cmd + 5, &x); + if (ret < 0) + return ret; + if (x >= pard->afhi.chunks_total) + return -ERRNO_TO_PARA_ERROR(EINVAL); + pard->first_chunk = pard->current_chunk = x; + return 1; + } + return -E_BTR_NAVAIL; +} + +static void *afh_recv_parse_config(int argc, char **argv) +{ + struct afh_recv_args_info *tmp = para_calloc(sizeof(*tmp)); + + afh_recv_cmdline_parser(argc, argv, tmp); + return tmp; +} + +static void afh_recv_free_config(void *conf) +{ + if (!conf) + return; + afh_recv_cmdline_parser_free(conf); + free(conf); +} + +static int afh_recv_open(struct receiver_node *rn) +{ + struct afh_recv_args_info *conf = rn->conf; + struct private_afh_recv_data *pard; + struct afh_info *afhi; + char *filename = conf->filename_arg; + + int ret; + + if (!filename || *filename == '\0') + return -E_AFH_RECV_BAD_FILENAME; + rn->private_data = pard = para_calloc(sizeof(*pard)); + afhi = &pard->afhi; + ret = mmap_full_file(filename, O_RDONLY, &pard->map, + &pard->map_size, &pard->fd); + if (ret < 0) + goto out; + ret = compute_afhi(filename, pard->map, pard->map_size, + pard->fd, afhi); + if (ret < 0) + goto out_unmap; + pard->audio_format_num = ret; + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + if (afhi->chunks_total == 0) + goto out_clear_afhi; + if (PARA_ABS(conf->begin_chunk_arg) >= afhi->chunks_total) + goto out_clear_afhi; + if (conf->begin_chunk_arg >= 0) + pard->first_chunk = conf->begin_chunk_arg; + else + pard->first_chunk = afhi->chunks_total + conf->begin_chunk_arg; + if (conf->end_chunk_given) { + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + if (PARA_ABS(conf->end_chunk_arg) > afhi->chunks_total) + goto out_clear_afhi; + if (conf->end_chunk_arg >= 0) + pard->last_chunk = conf->end_chunk_arg; + else + pard->last_chunk = afhi->chunks_total + conf->end_chunk_arg; + } else + pard->last_chunk = afhi->chunks_total - 1; + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + if (pard->first_chunk >= pard->last_chunk) + goto out_clear_afhi; + pard->current_chunk = pard->first_chunk; + return pard->audio_format_num; +out_clear_afhi: + clear_afhi(afhi); +out_unmap: + para_munmap(pard->map, pard->map_size); + close(pard->fd); +out: + freep(&rn->private_data); + return ret; +} + +static void afh_recv_close(struct receiver_node *rn) +{ + struct private_afh_recv_data *pard; + + if (!rn || !rn->private_data) + return; + pard = rn->private_data; + clear_afhi(&pard->afhi); + para_munmap(pard->map, pard->map_size); + close(pard->fd); + freep(&rn->private_data); +} + +static void afh_recv_pre_select(struct sched *s, struct task *t) +{ + struct receiver_node *rn = task_context(t); + struct private_afh_recv_data *pard = rn->private_data; + struct afh_info *afhi = &pard->afhi; + struct afh_recv_args_info *conf = rn->conf; + struct timeval chunk_time; + int state = generic_recv_pre_select(s, t); + + if (state <= 0) + return; + if (!conf->just_in_time_given) { + sched_min_delay(s); + return; + } + compute_chunk_time(pard->current_chunk - pard->first_chunk, + &afhi->chunk_tv, &pard->stream_start, &chunk_time); + sched_request_barrier_or_min_delay(&chunk_time, s); +} + +static int afh_recv_post_select(__a_unused struct sched *s, struct task *t) +{ + struct receiver_node *rn = task_context(t); + struct afh_recv_args_info *conf = rn->conf; + struct private_afh_recv_data *pard = rn->private_data; + struct btr_node *btrn = rn->btrn; + struct afh_info *afhi = &pard->afhi; + int ret; + char *buf; + const char *start, *end; + size_t size; + struct timeval chunk_time; + + ret = btr_node_status(btrn, 0, BTR_NT_ROOT); + if (ret <= 0) + goto out; + if (pard->first_chunk > 0 && !conf->no_header_given) { + char *header; + afh_get_header(afhi, pard->audio_format_num, pard->map, + pard->map_size, &header, &size); + if (size > 0) { + PARA_INFO_LOG("writing header (%zu bytes)\n", size); + buf = para_malloc(size); + memcpy(buf, header, size); + btr_add_output(buf, size, btrn); + afh_free_header(header, pard->audio_format_num); + } + } + if (!conf->just_in_time_given) { + afh_get_chunk(pard->first_chunk, afhi, pard->map, &start, &size); + afh_get_chunk(pard->last_chunk, afhi, pard->map, &end, &size); + end += size; + PARA_INFO_LOG("adding %zu bytes\n", end - start); + btr_add_output_dont_free(start, end - start, btrn); + ret = -E_RECV_EOF; + goto out; + } + if (pard->current_chunk == pard->first_chunk) + pard->stream_start = *now; + else { + compute_chunk_time(pard->current_chunk - pard->first_chunk, + &afhi->chunk_tv, &pard->stream_start, &chunk_time); + ret = tv_diff(&chunk_time, now, NULL); + if (ret > 0) + goto out; + } + afh_get_chunk(pard->current_chunk, afhi, pard->map, &start, &size); + PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk); + btr_add_output_dont_free(start, size, btrn); + if (pard->current_chunk >= pard->last_chunk) { + ret = -E_RECV_EOF; + goto out; + } + pard->current_chunk++; + ret = 1; +out: + if (ret < 0) { + btr_remove_node(&rn->btrn); + pard->current_chunk = pard->first_chunk; + } + return ret; +} /** * The init function of the afh receiver. @@ -24,18 +244,15 @@ void afh_recv_init(struct receiver *r) { struct afh_recv_args_info dummy; + afh_init(); afh_recv_cmdline_parser_init(&dummy); -#if 0 r->open = afh_recv_open; r->close = afh_recv_close; r->pre_select = afh_recv_pre_select; r->post_select = afh_recv_post_select; r->parse_config = afh_recv_parse_config; r->free_config = afh_recv_free_config; -#endif - r->help = (struct ggo_help) { - .short_help = afh_recv_args_info_help, - .detailed_help = afh_recv_args_info_detailed_help - }; + r->execute = afh_execute; + r->help = (struct ggo_help)DEFINE_GGO_HELP(afh_recv); afh_recv_cmdline_parser_free(&dummy); }