X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=afh_recv.c;h=6525209bff4119a177eb443f154b4f1f5224845b;hp=550ca424376ad2108557ef17159265e4667ad3eb;hb=f4019242252bf8e7594a72efdac6214a5abd4364;hpb=618a25011420f434f05305a4053a49824d39b4a2 diff --git a/afh_recv.c b/afh_recv.c index 550ca424..6525209b 100644 --- a/afh_recv.c +++ b/afh_recv.c @@ -1,41 +1,249 @@ +/* Copyright (C) 2011 Andre Noll , see file COPYING. */ + +/** \file afh_recv.c Receiver for streaming local files. */ + #include #include -#include +#include +#include "recv_cmd.lsg.h" #include "para.h" #include "error.h" #include "list.h" #include "sched.h" -#include "ggo.h" #include "buffer_tree.h" #include "recv.h" -#include "afh_recv.cmdline.h" #include "string.h" #include "fd.h" +#include "afh.h" + +struct private_afh_recv_data { + int fd; + void *map; + size_t map_size; + struct afh_info afhi; + int audio_format_num; + long unsigned first_chunk; + long unsigned last_chunk; + struct timeval stream_start; + uint32_t current_chunk; + void *afh_context; +}; -/** - * The init function of the afh receiver. - * - * \param r Pointer to the receiver struct to initialize. - * - * This initializes all function pointers of \a r. - */ -void afh_recv_init(struct receiver *r) +static int afh_execute(struct btr_node *btrn, const char *cmd, char **result) { - struct afh_recv_args_info dummy; - - afh_recv_cmdline_parser_init(&dummy); -#if 0 - r->open = afh_recv_open; - r->close = afh_recv_close; - r->pre_select = afh_recv_pre_select; - r->post_select = afh_recv_post_select; - r->parse_config = afh_recv_parse_config; - r->free_config = afh_recv_free_config; -#endif - r->help = (struct ggo_help) { - .short_help = afh_recv_args_info_help, - .detailed_help = afh_recv_args_info_detailed_help - }; - afh_recv_cmdline_parser_free(&dummy); + struct receiver_node *rn = btr_context(btrn); + struct private_afh_recv_data *pard = rn->private_data; + + *result = NULL; + if (!strcmp(cmd, "seconds_total")) { + *result = make_message("%" PRIu32, pard->afhi.seconds_total); + return 1; + } + if (!strcmp(cmd, "chunks_total")) { + *result = make_message("%" PRIu32, pard->afhi.chunks_total); + return 1; + } + if (!strcmp(cmd, "afhi")) { + afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result); + return 1; + } + if (!strncmp(cmd, "repos", 5)) { + int32_t x; + int ret = para_atoi32(cmd + 5, &x); + if (ret < 0) + return ret; + if (x >= pard->afhi.chunks_total) + return -ERRNO_TO_PARA_ERROR(EINVAL); + pard->first_chunk = afh_get_start_chunk(x, &pard->afhi, + pard->audio_format_num); + pard->current_chunk = pard->first_chunk; + return 1; + } + return -E_BTR_NAVAIL; } + +static int afh_recv_open(struct receiver_node *rn) +{ + struct lls_parse_result *lpr = rn->lpr; + struct private_afh_recv_data *pard; + struct afh_info *afhi; + const char *fn = RECV_CMD_OPT_STRING_VAL(AFH, FILENAME, lpr), *msg; + int32_t bc = RECV_CMD_OPT_INT32_VAL(AFH, BEGIN_CHUNK, lpr); + const struct lls_opt_result *r_e = RECV_CMD_OPT_RESULT(AFH, END_CHUNK, lpr); + int ret; + + if (!fn || *fn == '\0') + return -E_AFH_RECV_BAD_FILENAME; + rn->private_data = pard = para_calloc(sizeof(*pard)); + afhi = &pard->afhi; + ret = mmap_full_file(fn, O_RDONLY, &pard->map, + &pard->map_size, &pard->fd); + if (ret < 0) + goto out; + ret = compute_afhi(fn, pard->map, pard->map_size, + pard->fd, afhi); + if (ret < 0) + goto out_unmap; + pard->audio_format_num = ret; + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + msg = "no data chunks"; + if (afhi->chunks_total == 0) + goto out_clear_afhi; + msg = "invalid begin chunk"; + if (PARA_ABS(bc) >= afhi->chunks_total) + goto out_clear_afhi; + if (bc >= 0) + pard->first_chunk = afh_get_start_chunk(bc, &pard->afhi, + pard->audio_format_num); + else + pard->first_chunk = afh_get_start_chunk(afhi->chunks_total + bc, + &pard->afhi, pard->audio_format_num); + if (lls_opt_given(r_e)) { + int32_t ec = lls_int32_val(0, r_e); + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + msg = "invalid end chunk"; + if (PARA_ABS(ec) > afhi->chunks_total) + goto out_clear_afhi; + if (ec >= 0) + pard->last_chunk = ec; + else + pard->last_chunk = afhi->chunks_total + ec; + } else + pard->last_chunk = afhi->chunks_total - 1; + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + msg = "begin chunk >= end chunk!?"; + if (pard->first_chunk >= pard->last_chunk) + goto out_clear_afhi; + pard->current_chunk = pard->first_chunk; + return pard->audio_format_num; +out_clear_afhi: + clear_afhi(afhi); + PARA_ERROR_LOG("%s: %s\n", fn, msg); +out_unmap: + para_munmap(pard->map, pard->map_size); + close(pard->fd); +out: + freep(&rn->private_data); + return ret; +} + +static void afh_recv_close(struct receiver_node *rn) +{ + struct private_afh_recv_data *pard; + + if (!rn || !rn->private_data) + return; + pard = rn->private_data; + clear_afhi(&pard->afhi); + para_munmap(pard->map, pard->map_size); + close(pard->fd); + afh_close(pard->afh_context, pard->audio_format_num); + freep(&rn->private_data); +} + +static void afh_recv_pre_select(struct sched *s, void *context) +{ + struct receiver_node *rn = context; + struct private_afh_recv_data *pard = rn->private_data; + struct afh_info *afhi = &pard->afhi; + struct lls_parse_result *lpr = rn->lpr; + struct timeval chunk_time; + int state = generic_recv_pre_select(s, rn); + unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr); + + if (state <= 0) + return; + if (!j_given) { + sched_min_delay(s); + return; + } + compute_chunk_time(pard->current_chunk - pard->first_chunk, + &afhi->chunk_tv, &pard->stream_start, &chunk_time); + sched_request_barrier_or_min_delay(&chunk_time, s); +} + +static int afh_recv_post_select(__a_unused struct sched *s, void *context) +{ + struct receiver_node *rn = context; + struct lls_parse_result *lpr = rn->lpr; + struct private_afh_recv_data *pard = rn->private_data; + struct btr_node *btrn = rn->btrn; + struct afh_info *afhi = &pard->afhi; + int ret; + char *buf; + const char *start; + size_t size; + struct timeval chunk_time; + unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr); + unsigned H_given = RECV_CMD_OPT_GIVEN(AFH, NO_HEADER, lpr); + + ret = btr_node_status(btrn, 0, BTR_NT_ROOT); + if (ret <= 0) + goto out; + if (pard->first_chunk > 0 && !H_given) { + char *header; + afh_get_header(afhi, pard->audio_format_num, pard->map, + pard->map_size, &header, &size); + if (size > 0) { + PARA_INFO_LOG("writing header (%zu bytes)\n", size); + buf = para_malloc(size); + memcpy(buf, header, size); + btr_add_output(buf, size, btrn); + afh_free_header(header, pard->audio_format_num); + } + } + if (!j_given) { + long unsigned n; + for (n = pard->first_chunk; n < pard->last_chunk; n++) { + ret = afh_get_chunk(n, afhi, pard->audio_format_num, + pard->map, pard->map_size, &start, &size, + &pard->afh_context); + if (ret < 0) + goto out; + PARA_DEBUG_LOG("adding %zu bytes\n", size); + btr_add_output_dont_free(start, size, btrn); + } + ret = -E_RECV_EOF; + goto out; + } + if (pard->current_chunk == pard->first_chunk) + pard->stream_start = *now; + else { + compute_chunk_time(pard->current_chunk - pard->first_chunk, + &afhi->chunk_tv, &pard->stream_start, &chunk_time); + ret = tv_diff(&chunk_time, now, NULL); + if (ret > 0) + goto out; + } + ret = afh_get_chunk(pard->current_chunk, afhi, + pard->audio_format_num, pard->map, + pard->map_size, &start, &size, + &pard->afh_context); + if (ret < 0) + goto out; + PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk); + btr_add_output_dont_free(start, size, btrn); + if (pard->current_chunk >= pard->last_chunk) { + ret = -E_RECV_EOF; + goto out; + } + pard->current_chunk++; + ret = 1; +out: + if (ret < 0) { + btr_remove_node(&rn->btrn); + pard->current_chunk = pard->first_chunk; + } + return ret; +} + +/** See \ref recv_init(). */ +const struct receiver lsg_recv_cmd_com_afh_user_data = { + .init = afh_init, + .open = afh_recv_open, + .close = afh_recv_close, + .pre_select = afh_recv_pre_select, + .post_select = afh_recv_post_select, + .execute = afh_execute, +};