Merge branch 'refs/heads/t/i9e'
[paraslash.git] / afh_recv.c
1 /*
2 * Copyright (C) 2011 Andre Noll <maan@tuebingen.mpg.de>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6
7 /** \file afh_recv.c Receiver for streaming local files. */
8
9 #include <regex.h>
10 #include <sys/types.h>
11
12 #include "para.h"
13 #include "error.h"
14 #include "list.h"
15 #include "sched.h"
16 #include "ggo.h"
17 #include "buffer_tree.h"
18 #include "recv.h"
19 #include "afh_recv.cmdline.h"
20 #include "string.h"
21 #include "fd.h"
22 #include "afh.h"
23
24 struct private_afh_recv_data {
25 int fd;
26 void *map;
27 size_t map_size;
28 struct afh_info afhi;
29 int audio_format_num;
30 long unsigned first_chunk;
31 long unsigned last_chunk;
32 struct timeval stream_start;
33 uint32_t current_chunk;
34 };
35
36 static int afh_execute(struct btr_node *btrn, const char *cmd, char **result)
37 {
38 struct receiver_node *rn = btr_context(btrn);
39 struct private_afh_recv_data *pard = rn->private_data;
40
41 *result = NULL;
42 if (!strcmp(cmd, "seconds_total")) {
43 *result = make_message("%" PRIu32, pard->afhi.seconds_total);
44 return 1;
45 }
46 if (!strcmp(cmd, "chunks_total")) {
47 *result = make_message("%" PRIu32, pard->afhi.chunks_total);
48 return 1;
49 }
50 if (!strcmp(cmd, "afhi")) {
51 afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result);
52 return 1;
53 }
54 if (!strncmp(cmd, "repos", 5)) {
55 int32_t x;
56 int ret = para_atoi32(cmd + 5, &x);
57 if (ret < 0)
58 return ret;
59 if (x >= pard->afhi.chunks_total)
60 return -ERRNO_TO_PARA_ERROR(EINVAL);
61 pard->first_chunk = afh_get_start_chunk(x, &pard->afhi);
62 pard->current_chunk = pard->first_chunk;
63 return 1;
64 }
65 return -E_BTR_NAVAIL;
66 }
67
68 static void *afh_recv_parse_config(int argc, char **argv)
69 {
70 struct afh_recv_args_info *tmp = para_calloc(sizeof(*tmp));
71
72 afh_recv_cmdline_parser(argc, argv, tmp);
73 return tmp;
74 }
75
76 static void afh_recv_free_config(void *conf)
77 {
78 if (!conf)
79 return;
80 afh_recv_cmdline_parser_free(conf);
81 free(conf);
82 }
83
84 static int afh_recv_open(struct receiver_node *rn)
85 {
86 struct afh_recv_args_info *conf = rn->conf;
87 struct private_afh_recv_data *pard;
88 struct afh_info *afhi;
89 char *filename = conf->filename_arg;
90
91 int ret;
92
93 if (!filename || *filename == '\0')
94 return -E_AFH_RECV_BAD_FILENAME;
95 rn->private_data = pard = para_calloc(sizeof(*pard));
96 afhi = &pard->afhi;
97 ret = mmap_full_file(filename, O_RDONLY, &pard->map,
98 &pard->map_size, &pard->fd);
99 if (ret < 0)
100 goto out;
101 ret = compute_afhi(filename, pard->map, pard->map_size,
102 pard->fd, afhi);
103 if (ret < 0)
104 goto out_unmap;
105 pard->audio_format_num = ret;
106 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
107 if (afhi->chunks_total == 0)
108 goto out_clear_afhi;
109 if (PARA_ABS(conf->begin_chunk_arg) >= afhi->chunks_total)
110 goto out_clear_afhi;
111 if (conf->begin_chunk_arg >= 0)
112 pard->first_chunk = afh_get_start_chunk(
113 conf->begin_chunk_arg, &pard->afhi);
114 else
115 pard->first_chunk = afh_get_start_chunk(
116 afhi->chunks_total + conf->begin_chunk_arg,
117 &pard->afhi);
118 if (conf->end_chunk_given) {
119 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
120 if (PARA_ABS(conf->end_chunk_arg) > afhi->chunks_total)
121 goto out_clear_afhi;
122 if (conf->end_chunk_arg >= 0)
123 pard->last_chunk = conf->end_chunk_arg;
124 else
125 pard->last_chunk = afhi->chunks_total + conf->end_chunk_arg;
126 } else
127 pard->last_chunk = afhi->chunks_total - 1;
128 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
129 if (pard->first_chunk >= pard->last_chunk)
130 goto out_clear_afhi;
131 pard->current_chunk = pard->first_chunk;
132 return pard->audio_format_num;
133 out_clear_afhi:
134 clear_afhi(afhi);
135 out_unmap:
136 para_munmap(pard->map, pard->map_size);
137 close(pard->fd);
138 out:
139 freep(&rn->private_data);
140 return ret;
141 }
142
143 static void afh_recv_close(struct receiver_node *rn)
144 {
145 struct private_afh_recv_data *pard;
146
147 if (!rn || !rn->private_data)
148 return;
149 pard = rn->private_data;
150 clear_afhi(&pard->afhi);
151 para_munmap(pard->map, pard->map_size);
152 close(pard->fd);
153 freep(&rn->private_data);
154 }
155
156 static void afh_recv_pre_select(struct sched *s, void *context)
157 {
158 struct receiver_node *rn = context;
159 struct private_afh_recv_data *pard = rn->private_data;
160 struct afh_info *afhi = &pard->afhi;
161 struct afh_recv_args_info *conf = rn->conf;
162 struct timeval chunk_time;
163 int state = generic_recv_pre_select(s, rn);
164
165 if (state <= 0)
166 return;
167 if (!conf->just_in_time_given) {
168 sched_min_delay(s);
169 return;
170 }
171 compute_chunk_time(pard->current_chunk - pard->first_chunk,
172 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
173 sched_request_barrier_or_min_delay(&chunk_time, s);
174 }
175
176 static int afh_recv_post_select(__a_unused struct sched *s, void *context)
177 {
178 struct receiver_node *rn = context;
179 struct afh_recv_args_info *conf = rn->conf;
180 struct private_afh_recv_data *pard = rn->private_data;
181 struct btr_node *btrn = rn->btrn;
182 struct afh_info *afhi = &pard->afhi;
183 int ret;
184 char *buf;
185 const char *start, *end;
186 size_t size;
187 struct timeval chunk_time;
188
189 ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
190 if (ret <= 0)
191 goto out;
192 if (pard->first_chunk > 0 && !conf->no_header_given) {
193 char *header;
194 afh_get_header(afhi, pard->audio_format_num, pard->map,
195 pard->map_size, &header, &size);
196 if (size > 0) {
197 PARA_INFO_LOG("writing header (%zu bytes)\n", size);
198 buf = para_malloc(size);
199 memcpy(buf, header, size);
200 btr_add_output(buf, size, btrn);
201 afh_free_header(header, pard->audio_format_num);
202 }
203 }
204 if (!conf->just_in_time_given) {
205 afh_get_chunk(pard->first_chunk, afhi, pard->map, &start, &size);
206 afh_get_chunk(pard->last_chunk, afhi, pard->map, &end, &size);
207 end += size;
208 PARA_INFO_LOG("adding %zu bytes\n", end - start);
209 btr_add_output_dont_free(start, end - start, btrn);
210 ret = -E_RECV_EOF;
211 goto out;
212 }
213 if (pard->current_chunk == pard->first_chunk)
214 pard->stream_start = *now;
215 else {
216 compute_chunk_time(pard->current_chunk - pard->first_chunk,
217 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
218 ret = tv_diff(&chunk_time, now, NULL);
219 if (ret > 0)
220 goto out;
221 }
222 afh_get_chunk(pard->current_chunk, afhi, pard->map, &start, &size);
223 PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk);
224 btr_add_output_dont_free(start, size, btrn);
225 if (pard->current_chunk >= pard->last_chunk) {
226 ret = -E_RECV_EOF;
227 goto out;
228 }
229 pard->current_chunk++;
230 ret = 1;
231 out:
232 if (ret < 0) {
233 btr_remove_node(&rn->btrn);
234 pard->current_chunk = pard->first_chunk;
235 }
236 return ret;
237 }
238
239 /**
240 * The init function of the afh receiver.
241 *
242 * \param r Pointer to the receiver struct to initialize.
243 *
244 * This initializes all function pointers of \a r.
245 */
246 void afh_recv_init(struct receiver *r)
247 {
248 struct afh_recv_args_info dummy;
249
250 afh_init();
251 afh_recv_cmdline_parser_init(&dummy);
252 r->open = afh_recv_open;
253 r->close = afh_recv_close;
254 r->pre_select = afh_recv_pre_select;
255 r->post_select = afh_recv_post_select;
256 r->parse_config = afh_recv_parse_config;
257 r->free_config = afh_recv_free_config;
258 r->execute = afh_execute;
259 r->help = (struct ggo_help)DEFINE_GGO_HELP(afh_recv);
260 afh_recv_cmdline_parser_free(&dummy);
261 }