08f0d1e7c4c615ac96b8ddc27a39af41f5fa1625
[paraslash.git] / afh_recv.c
1 /*
2  * Copyright (C) 2011 Andre Noll <maan@tuebingen.mpg.de>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file afh_recv.c Receiver for streaming local files. */
8
9 #include <regex.h>
10 #include <sys/types.h>
11
12 #include "para.h"
13 #include "error.h"
14 #include "list.h"
15 #include "sched.h"
16 #include "ggo.h"
17 #include "buffer_tree.h"
18 #include "recv.h"
19 #include "afh_recv.cmdline.h"
20 #include "string.h"
21 #include "fd.h"
22 #include "afh.h"
23
24 struct private_afh_recv_data {
25         int fd;
26         void *map;
27         size_t map_size;
28         struct afh_info afhi;
29         int audio_format_num;
30         long unsigned first_chunk;
31         long unsigned last_chunk;
32         struct timeval stream_start;
33         uint32_t current_chunk;
34         void *afh_context;
35 };
36
37 static int afh_execute(struct btr_node *btrn, const char *cmd, char **result)
38 {
39         struct receiver_node *rn = btr_context(btrn);
40         struct private_afh_recv_data *pard = rn->private_data;
41
42         *result = NULL;
43         if (!strcmp(cmd, "seconds_total")) {
44                 *result = make_message("%" PRIu32, pard->afhi.seconds_total);
45                 return 1;
46         }
47         if (!strcmp(cmd, "chunks_total")) {
48                 *result = make_message("%" PRIu32, pard->afhi.chunks_total);
49                 return 1;
50         }
51         if (!strcmp(cmd, "afhi")) {
52                 afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result);
53                 return 1;
54         }
55         if (!strncmp(cmd, "repos", 5)) {
56                 int32_t x;
57                 int ret = para_atoi32(cmd + 5, &x);
58                 if (ret < 0)
59                         return ret;
60                 if (x >= pard->afhi.chunks_total)
61                         return -ERRNO_TO_PARA_ERROR(EINVAL);
62                 pard->first_chunk = afh_get_start_chunk(x, &pard->afhi,
63                         pard->audio_format_num);
64                 pard->current_chunk = pard->first_chunk;
65                 return 1;
66         }
67         return -E_BTR_NAVAIL;
68 }
69
70 static void *afh_recv_parse_config(int argc, char **argv)
71 {
72         struct afh_recv_args_info *tmp = para_calloc(sizeof(*tmp));
73
74         afh_recv_cmdline_parser(argc, argv, tmp);
75         return tmp;
76 }
77
78 static void afh_recv_free_config(void *conf)
79 {
80         if (!conf)
81                 return;
82         afh_recv_cmdline_parser_free(conf);
83         free(conf);
84 }
85
86 static int afh_recv_open(struct receiver_node *rn)
87 {
88         struct afh_recv_args_info *conf = rn->conf;
89         struct private_afh_recv_data *pard;
90         struct afh_info *afhi;
91         char *filename = conf->filename_arg;
92
93         int ret;
94
95         if (!filename || *filename == '\0')
96                 return -E_AFH_RECV_BAD_FILENAME;
97         rn->private_data = pard = para_calloc(sizeof(*pard));
98         afhi = &pard->afhi;
99         ret = mmap_full_file(filename, O_RDONLY, &pard->map,
100                 &pard->map_size, &pard->fd);
101         if (ret < 0)
102                 goto out;
103         ret = compute_afhi(filename, pard->map, pard->map_size,
104                 pard->fd, afhi);
105         if (ret < 0)
106                 goto out_unmap;
107         pard->audio_format_num = ret;
108         ret = -ERRNO_TO_PARA_ERROR(EINVAL);
109         if (afhi->chunks_total == 0)
110                 goto out_clear_afhi;
111         if (PARA_ABS(conf->begin_chunk_arg) >= afhi->chunks_total)
112                 goto out_clear_afhi;
113         if (conf->begin_chunk_arg >= 0)
114                 pard->first_chunk = afh_get_start_chunk(
115                         conf->begin_chunk_arg, &pard->afhi,
116                         pard->audio_format_num);
117         else
118                 pard->first_chunk = afh_get_start_chunk(
119                         afhi->chunks_total + conf->begin_chunk_arg,
120                         &pard->afhi, pard->audio_format_num);
121         if (conf->end_chunk_given) {
122                 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
123                 if (PARA_ABS(conf->end_chunk_arg) > afhi->chunks_total)
124                         goto out_clear_afhi;
125                 if (conf->end_chunk_arg >= 0)
126                         pard->last_chunk = conf->end_chunk_arg;
127                 else
128                         pard->last_chunk = afhi->chunks_total + conf->end_chunk_arg;
129         } else
130                 pard->last_chunk = afhi->chunks_total - 1;
131         ret = -ERRNO_TO_PARA_ERROR(EINVAL);
132         if (pard->first_chunk >= pard->last_chunk)
133                 goto out_clear_afhi;
134         pard->current_chunk = pard->first_chunk;
135         return pard->audio_format_num;
136 out_clear_afhi:
137         clear_afhi(afhi);
138 out_unmap:
139         para_munmap(pard->map, pard->map_size);
140         close(pard->fd);
141 out:
142         freep(&rn->private_data);
143         return ret;
144 }
145
146 static void afh_recv_close(struct receiver_node *rn)
147 {
148         struct private_afh_recv_data *pard;
149
150         if (!rn || !rn->private_data)
151                 return;
152         pard = rn->private_data;
153         clear_afhi(&pard->afhi);
154         para_munmap(pard->map, pard->map_size);
155         close(pard->fd);
156         afh_close(pard->afh_context, pard->audio_format_num);
157         freep(&rn->private_data);
158 }
159
160 static void afh_recv_pre_select(struct sched *s, void *context)
161 {
162         struct receiver_node *rn = context;
163         struct private_afh_recv_data *pard = rn->private_data;
164         struct afh_info *afhi = &pard->afhi;
165         struct afh_recv_args_info *conf = rn->conf;
166         struct timeval chunk_time;
167         int state = generic_recv_pre_select(s, rn);
168
169         if (state <= 0)
170                 return;
171         if (!conf->just_in_time_given) {
172                 sched_min_delay(s);
173                 return;
174         }
175         compute_chunk_time(pard->current_chunk - pard->first_chunk,
176                 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
177         sched_request_barrier_or_min_delay(&chunk_time, s);
178 }
179
180 static int afh_recv_post_select(__a_unused struct sched *s, void *context)
181 {
182         struct receiver_node *rn = context;
183         struct afh_recv_args_info *conf = rn->conf;
184         struct private_afh_recv_data *pard = rn->private_data;
185         struct btr_node *btrn = rn->btrn;
186         struct afh_info *afhi = &pard->afhi;
187         int ret;
188         char *buf;
189         const char *start;
190         size_t size;
191         struct timeval chunk_time;
192
193         ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
194         if (ret <= 0)
195                 goto out;
196         if (pard->first_chunk > 0 && !conf->no_header_given) {
197                 char *header;
198                 afh_get_header(afhi, pard->audio_format_num, pard->map,
199                         pard->map_size, &header, &size);
200                 if (size > 0) {
201                         PARA_INFO_LOG("writing header (%zu bytes)\n", size);
202                         buf = para_malloc(size);
203                         memcpy(buf, header, size);
204                         btr_add_output(buf, size, btrn);
205                         afh_free_header(header, pard->audio_format_num);
206                 }
207         }
208         if (!conf->just_in_time_given) {
209                 long unsigned n;
210                 for (n = pard->first_chunk; n < pard->last_chunk; n++) {
211                         ret = afh_get_chunk(n, afhi, pard->audio_format_num,
212                                 pard->map, pard->map_size, &start, &size,
213                                 &pard->afh_context);
214                         if (ret < 0)
215                                 goto out;
216                         PARA_INFO_LOG("adding %zu bytes\n", size);
217                         btr_add_output_dont_free(start, size, btrn);
218                 }
219                 ret = -E_RECV_EOF;
220                 goto out;
221         }
222         if (pard->current_chunk == pard->first_chunk)
223                 pard->stream_start = *now;
224         else {
225                 compute_chunk_time(pard->current_chunk - pard->first_chunk,
226                         &afhi->chunk_tv, &pard->stream_start, &chunk_time);
227                 ret = tv_diff(&chunk_time, now, NULL);
228                 if (ret > 0)
229                         goto out;
230         }
231         ret = afh_get_chunk(pard->current_chunk, afhi,
232                 pard->audio_format_num, pard->map,
233                 pard->map_size, &start, &size,
234                 &pard->afh_context);
235         PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk);
236         btr_add_output_dont_free(start, size, btrn);
237         if (pard->current_chunk >= pard->last_chunk) {
238                 ret = -E_RECV_EOF;
239                 goto out;
240         }
241         pard->current_chunk++;
242         ret = 1;
243 out:
244         if (ret < 0) {
245                 btr_remove_node(&rn->btrn);
246                 pard->current_chunk = pard->first_chunk;
247         }
248         return ret;
249 }
250
251 /**
252  * The init function of the afh receiver.
253  *
254  * \param r Pointer to the receiver struct to initialize.
255  *
256  * This initializes all function pointers of \a r.
257  */
258 void afh_recv_init(struct receiver *r)
259 {
260         struct afh_recv_args_info dummy;
261
262         afh_init();
263         afh_recv_cmdline_parser_init(&dummy);
264         r->open = afh_recv_open;
265         r->close = afh_recv_close;
266         r->pre_select = afh_recv_pre_select;
267         r->post_select = afh_recv_post_select;
268         r->parse_config = afh_recv_parse_config;
269         r->free_config = afh_recv_free_config;
270         r->execute = afh_execute;
271         r->help = (struct ggo_help)DEFINE_GGO_HELP(afh_recv);
272         afh_recv_cmdline_parser_free(&dummy);
273 }