01785d6c6796dd98bfc514bb11a76fe3cbfaa101
2 * Copyright (C) 2011-2014 Andre Noll <maan@tuebingen.mpg.de>
4 * Licensed under the GPL v2. For licencing details see COPYING.
7 /** \file afh_recv.c Receiver for streaming local files. */
10 #include <sys/types.h>
18 #include "buffer_tree.h"
20 #include "afh_recv.cmdline.h"
25 struct private_afh_recv_data
{
31 long unsigned first_chunk
;
32 long unsigned last_chunk
;
33 struct timeval stream_start
;
34 uint32_t current_chunk
;
37 static int afh_execute(struct btr_node
*btrn
, const char *cmd
, char **result
)
39 struct receiver_node
*rn
= btr_context(btrn
);
40 struct private_afh_recv_data
*pard
= rn
->private_data
;
43 if (!strcmp(cmd
, "seconds_total")) {
44 *result
= make_message("%lu", pard
->afhi
.seconds_total
);
47 if (!strcmp(cmd
, "chunks_total")) {
48 *result
= make_message("%lu", pard
->afhi
.chunks_total
);
51 if (!strcmp(cmd
, "afhi")) {
52 afh_get_afhi_txt(pard
->audio_format_num
, &pard
->afhi
, result
);
55 if (!strncmp(cmd
, "repos", 5)) {
57 int ret
= para_atoi32(cmd
+ 5, &x
);
60 if (x
>= pard
->afhi
.chunks_total
)
61 return -ERRNO_TO_PARA_ERROR(EINVAL
);
62 pard
->first_chunk
= afh_get_start_chunk(x
, &pard
->afhi
);
63 pard
->current_chunk
= pard
->first_chunk
;
69 static void *afh_recv_parse_config(int argc
, char **argv
)
71 struct afh_recv_args_info
*tmp
= para_calloc(sizeof(*tmp
));
73 afh_recv_cmdline_parser(argc
, argv
, tmp
);
77 static void afh_recv_free_config(void *conf
)
81 afh_recv_cmdline_parser_free(conf
);
85 static int afh_recv_open(struct receiver_node
*rn
)
87 struct afh_recv_args_info
*conf
= rn
->conf
;
88 struct private_afh_recv_data
*pard
;
89 struct afh_info
*afhi
;
90 char *filename
= conf
->filename_arg
;
94 if (!filename
|| *filename
== '\0')
95 return -E_AFH_RECV_BAD_FILENAME
;
96 rn
->private_data
= pard
= para_calloc(sizeof(*pard
));
98 ret
= mmap_full_file(filename
, O_RDONLY
, &pard
->map
,
99 &pard
->map_size
, &pard
->fd
);
102 ret
= compute_afhi(filename
, pard
->map
, pard
->map_size
,
106 pard
->audio_format_num
= ret
;
107 ret
= -ERRNO_TO_PARA_ERROR(EINVAL
);
108 if (afhi
->chunks_total
== 0)
110 if (PARA_ABS(conf
->begin_chunk_arg
) >= afhi
->chunks_total
)
112 if (conf
->begin_chunk_arg
>= 0)
113 pard
->first_chunk
= afh_get_start_chunk(
114 conf
->begin_chunk_arg
, &pard
->afhi
);
116 pard
->first_chunk
= afh_get_start_chunk(
117 afhi
->chunks_total
+ conf
->begin_chunk_arg
,
119 if (conf
->end_chunk_given
) {
120 ret
= -ERRNO_TO_PARA_ERROR(EINVAL
);
121 if (PARA_ABS(conf
->end_chunk_arg
) > afhi
->chunks_total
)
123 if (conf
->end_chunk_arg
>= 0)
124 pard
->last_chunk
= conf
->end_chunk_arg
;
126 pard
->last_chunk
= afhi
->chunks_total
+ conf
->end_chunk_arg
;
128 pard
->last_chunk
= afhi
->chunks_total
- 1;
129 ret
= -ERRNO_TO_PARA_ERROR(EINVAL
);
130 if (pard
->first_chunk
>= pard
->last_chunk
)
132 pard
->current_chunk
= pard
->first_chunk
;
133 return pard
->audio_format_num
;
137 para_munmap(pard
->map
, pard
->map_size
);
140 freep(&rn
->private_data
);
144 static void afh_recv_close(struct receiver_node
*rn
)
146 struct private_afh_recv_data
*pard
;
148 if (!rn
|| !rn
->private_data
)
150 pard
= rn
->private_data
;
151 clear_afhi(&pard
->afhi
);
152 para_munmap(pard
->map
, pard
->map_size
);
154 freep(&rn
->private_data
);
157 static void afh_recv_pre_select(struct sched
*s
, void *context
)
159 struct receiver_node
*rn
= context
;
160 struct private_afh_recv_data
*pard
= rn
->private_data
;
161 struct afh_info
*afhi
= &pard
->afhi
;
162 struct afh_recv_args_info
*conf
= rn
->conf
;
163 struct timeval chunk_time
;
164 int state
= generic_recv_pre_select(s
, rn
);
168 if (!conf
->just_in_time_given
) {
172 compute_chunk_time(pard
->current_chunk
- pard
->first_chunk
,
173 &afhi
->chunk_tv
, &pard
->stream_start
, &chunk_time
);
174 sched_request_barrier_or_min_delay(&chunk_time
, s
);
177 static int afh_recv_post_select(__a_unused
struct sched
*s
, void *context
)
179 struct receiver_node
*rn
= context
;
180 struct afh_recv_args_info
*conf
= rn
->conf
;
181 struct private_afh_recv_data
*pard
= rn
->private_data
;
182 struct btr_node
*btrn
= rn
->btrn
;
183 struct afh_info
*afhi
= &pard
->afhi
;
186 const char *start
, *end
;
188 struct timeval chunk_time
;
190 ret
= btr_node_status(btrn
, 0, BTR_NT_ROOT
);
193 if (pard
->first_chunk
> 0 && !conf
->no_header_given
) {
195 afh_get_header(afhi
, pard
->audio_format_num
, pard
->map
,
196 pard
->map_size
, &header
, &size
);
198 PARA_INFO_LOG("writing header (%zu bytes)\n", size
);
199 buf
= para_malloc(size
);
200 memcpy(buf
, header
, size
);
201 btr_add_output(buf
, size
, btrn
);
202 afh_free_header(header
, pard
->audio_format_num
);
205 if (!conf
->just_in_time_given
) {
206 afh_get_chunk(pard
->first_chunk
, afhi
, pard
->map
, &start
, &size
);
207 afh_get_chunk(pard
->last_chunk
, afhi
, pard
->map
, &end
, &size
);
209 PARA_INFO_LOG("adding %zu bytes\n", end
- start
);
210 btr_add_output_dont_free(start
, end
- start
, btrn
);
214 if (pard
->current_chunk
== pard
->first_chunk
)
215 pard
->stream_start
= *now
;
217 compute_chunk_time(pard
->current_chunk
- pard
->first_chunk
,
218 &afhi
->chunk_tv
, &pard
->stream_start
, &chunk_time
);
219 ret
= tv_diff(&chunk_time
, now
, NULL
);
223 afh_get_chunk(pard
->current_chunk
, afhi
, pard
->map
, &start
, &size
);
224 PARA_DEBUG_LOG("adding chunk %u\n", pard
->current_chunk
);
225 btr_add_output_dont_free(start
, size
, btrn
);
226 if (pard
->current_chunk
>= pard
->last_chunk
) {
230 pard
->current_chunk
++;
234 btr_remove_node(&rn
->btrn
);
235 pard
->current_chunk
= pard
->first_chunk
;
241 * The init function of the afh receiver.
243 * \param r Pointer to the receiver struct to initialize.
245 * This initializes all function pointers of \a r.
247 void afh_recv_init(struct receiver
*r
)
249 struct afh_recv_args_info dummy
;
252 afh_recv_cmdline_parser_init(&dummy
);
253 r
->open
= afh_recv_open
;
254 r
->close
= afh_recv_close
;
255 r
->pre_select
= afh_recv_pre_select
;
256 r
->post_select
= afh_recv_post_select
;
257 r
->parse_config
= afh_recv_parse_config
;
258 r
->free_config
= afh_recv_free_config
;
259 r
->execute
= afh_execute
;
260 r
->help
= (struct ggo_help
)DEFINE_GGO_HELP(afh_recv
);
261 afh_recv_cmdline_parser_free(&dummy
);