2 * Copyright (C) 2011 Andre Noll <maan@tuebingen.mpg.de>
4 * Licensed under the GPL v2. For licencing details see COPYING.
7 /** \file afh_recv.c Receiver for streaming local files. */
10 #include <sys/types.h>
17 #include "buffer_tree.h"
19 #include "afh_recv.cmdline.h"
24 struct private_afh_recv_data {
30 long unsigned first_chunk;
31 long unsigned last_chunk;
32 struct timeval stream_start;
33 uint32_t current_chunk;
36 static int afh_execute(struct btr_node *btrn, const char *cmd, char **result)
38 struct receiver_node *rn = btr_context(btrn);
39 struct private_afh_recv_data *pard = rn->private_data;
42 if (!strcmp(cmd, "seconds_total")) {
43 *result = make_message("%lu", pard->afhi.seconds_total);
46 if (!strcmp(cmd, "chunks_total")) {
47 *result = make_message("%lu", pard->afhi.chunks_total);
50 if (!strcmp(cmd, "afhi")) {
51 afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result);
54 if (!strncmp(cmd, "repos", 5)) {
56 int ret = para_atoi32(cmd + 5, &x);
59 if (x >= pard->afhi.chunks_total)
60 return -ERRNO_TO_PARA_ERROR(EINVAL);
61 pard->first_chunk = afh_get_start_chunk(x, &pard->afhi);
62 pard->current_chunk = pard->first_chunk;
68 static void *afh_recv_parse_config(int argc, char **argv)
70 struct afh_recv_args_info *tmp = para_calloc(sizeof(*tmp));
72 afh_recv_cmdline_parser(argc, argv, tmp);
76 static void afh_recv_free_config(void *conf)
80 afh_recv_cmdline_parser_free(conf);
84 static int afh_recv_open(struct receiver_node *rn)
86 struct afh_recv_args_info *conf = rn->conf;
87 struct private_afh_recv_data *pard;
88 struct afh_info *afhi;
89 char *filename = conf->filename_arg;
93 if (!filename || *filename == '\0')
94 return -E_AFH_RECV_BAD_FILENAME;
95 rn->private_data = pard = para_calloc(sizeof(*pard));
97 ret = mmap_full_file(filename, O_RDONLY, &pard->map,
98 &pard->map_size, &pard->fd);
101 ret = compute_afhi(filename, pard->map, pard->map_size,
105 pard->audio_format_num = ret;
106 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
107 if (afhi->chunks_total == 0)
109 if (PARA_ABS(conf->begin_chunk_arg) >= afhi->chunks_total)
111 if (conf->begin_chunk_arg >= 0)
112 pard->first_chunk = afh_get_start_chunk(
113 conf->begin_chunk_arg, &pard->afhi);
115 pard->first_chunk = afh_get_start_chunk(
116 afhi->chunks_total + conf->begin_chunk_arg,
118 if (conf->end_chunk_given) {
119 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
120 if (PARA_ABS(conf->end_chunk_arg) > afhi->chunks_total)
122 if (conf->end_chunk_arg >= 0)
123 pard->last_chunk = conf->end_chunk_arg;
125 pard->last_chunk = afhi->chunks_total + conf->end_chunk_arg;
127 pard->last_chunk = afhi->chunks_total - 1;
128 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
129 if (pard->first_chunk >= pard->last_chunk)
131 pard->current_chunk = pard->first_chunk;
132 return pard->audio_format_num;
136 para_munmap(pard->map, pard->map_size);
139 freep(&rn->private_data);
143 static void afh_recv_close(struct receiver_node *rn)
145 struct private_afh_recv_data *pard;
147 if (!rn || !rn->private_data)
149 pard = rn->private_data;
150 clear_afhi(&pard->afhi);
151 para_munmap(pard->map, pard->map_size);
153 freep(&rn->private_data);
156 static void afh_recv_pre_select(struct sched *s, void *context)
158 struct receiver_node *rn = context;
159 struct private_afh_recv_data *pard = rn->private_data;
160 struct afh_info *afhi = &pard->afhi;
161 struct afh_recv_args_info *conf = rn->conf;
162 struct timeval chunk_time;
163 int state = generic_recv_pre_select(s, rn);
167 if (!conf->just_in_time_given) {
171 compute_chunk_time(pard->current_chunk - pard->first_chunk,
172 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
173 sched_request_barrier_or_min_delay(&chunk_time, s);
176 static int afh_recv_post_select(__a_unused struct sched *s, void *context)
178 struct receiver_node *rn = context;
179 struct afh_recv_args_info *conf = rn->conf;
180 struct private_afh_recv_data *pard = rn->private_data;
181 struct btr_node *btrn = rn->btrn;
182 struct afh_info *afhi = &pard->afhi;
185 const char *start, *end;
187 struct timeval chunk_time;
189 ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
192 if (pard->first_chunk > 0 && !conf->no_header_given) {
194 afh_get_header(afhi, pard->audio_format_num, pard->map,
195 pard->map_size, &header, &size);
197 PARA_INFO_LOG("writing header (%zu bytes)\n", size);
198 buf = para_malloc(size);
199 memcpy(buf, header, size);
200 btr_add_output(buf, size, btrn);
201 afh_free_header(header, pard->audio_format_num);
204 if (!conf->just_in_time_given) {
205 afh_get_chunk(pard->first_chunk, afhi, pard->map, &start, &size);
206 afh_get_chunk(pard->last_chunk, afhi, pard->map, &end, &size);
208 PARA_INFO_LOG("adding %zu bytes\n", end - start);
209 btr_add_output_dont_free(start, end - start, btrn);
213 if (pard->current_chunk == pard->first_chunk)
214 pard->stream_start = *now;
216 compute_chunk_time(pard->current_chunk - pard->first_chunk,
217 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
218 ret = tv_diff(&chunk_time, now, NULL);
222 afh_get_chunk(pard->current_chunk, afhi, pard->map, &start, &size);
223 PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk);
224 btr_add_output_dont_free(start, size, btrn);
225 if (pard->current_chunk >= pard->last_chunk) {
229 pard->current_chunk++;
233 btr_remove_node(&rn->btrn);
234 pard->current_chunk = pard->first_chunk;
240 * The init function of the afh receiver.
242 * \param r Pointer to the receiver struct to initialize.
244 * This initializes all function pointers of \a r.
246 void afh_recv_init(struct receiver *r)
248 struct afh_recv_args_info dummy;
251 afh_recv_cmdline_parser_init(&dummy);
252 r->open = afh_recv_open;
253 r->close = afh_recv_close;
254 r->pre_select = afh_recv_pre_select;
255 r->post_select = afh_recv_post_select;
256 r->parse_config = afh_recv_parse_config;
257 r->free_config = afh_recv_free_config;
258 r->execute = afh_execute;
259 r->help = (struct ggo_help)DEFINE_GGO_HELP(afh_recv);
260 afh_recv_cmdline_parser_free(&dummy);