play: Implement help --long.
[paraslash.git] / afh_recv.c
1 /* Copyright (C) 2011 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
2
3 /** \file afh_recv.c Receiver for streaming local files. */
4
5 #include <regex.h>
6 #include <sys/types.h>
7 #include <lopsub.h>
8
9 #include "recv_cmd.lsg.h"
10 #include "para.h"
11 #include "error.h"
12 #include "list.h"
13 #include "sched.h"
14 #include "buffer_tree.h"
15 #include "recv.h"
16 #include "string.h"
17 #include "fd.h"
18 #include "afh.h"
19
20 struct private_afh_recv_data {
21         int fd;
22         void *map;
23         size_t map_size;
24         struct afh_info afhi;
25         int audio_format_num;
26         long unsigned first_chunk;
27         long unsigned last_chunk;
28         struct timeval stream_start;
29         uint32_t current_chunk;
30         void *afh_context;
31 };
32
33 static int afh_execute(struct btr_node *btrn, const char *cmd, char **result)
34 {
35         struct receiver_node *rn = btr_context(btrn);
36         struct private_afh_recv_data *pard = rn->private_data;
37
38         *result = NULL;
39         if (!strcmp(cmd, "seconds_total")) {
40                 *result = make_message("%" PRIu32, pard->afhi.seconds_total);
41                 return 1;
42         }
43         if (!strcmp(cmd, "chunks_total")) {
44                 *result = make_message("%" PRIu32, pard->afhi.chunks_total);
45                 return 1;
46         }
47         if (!strcmp(cmd, "afhi")) {
48                 afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result);
49                 return 1;
50         }
51         if (!strncmp(cmd, "repos", 5)) {
52                 int32_t x;
53                 int ret = para_atoi32(cmd + 5, &x);
54                 if (ret < 0)
55                         return ret;
56                 if (x >= pard->afhi.chunks_total)
57                         return -ERRNO_TO_PARA_ERROR(EINVAL);
58                 pard->first_chunk = afh_get_start_chunk(x, &pard->afhi,
59                         pard->audio_format_num);
60                 pard->current_chunk = pard->first_chunk;
61                 return 1;
62         }
63         return -E_BTR_NAVAIL;
64 }
65
66 static int afh_recv_open(struct receiver_node *rn)
67 {
68         struct lls_parse_result *lpr = rn->lpr;
69         struct private_afh_recv_data *pard;
70         struct afh_info *afhi;
71         const char *fn = RECV_CMD_OPT_STRING_VAL(AFH, FILENAME, lpr), *msg;
72         int32_t bc = RECV_CMD_OPT_INT32_VAL(AFH, BEGIN_CHUNK, lpr);
73         const struct lls_opt_result *r_e = RECV_CMD_OPT_RESULT(AFH, END_CHUNK, lpr);
74         int ret;
75
76         if (!fn || *fn == '\0')
77                 return -E_AFH_RECV_BAD_FILENAME;
78         rn->private_data = pard = para_calloc(sizeof(*pard));
79         afhi = &pard->afhi;
80         ret = mmap_full_file(fn, O_RDONLY, &pard->map,
81                 &pard->map_size, &pard->fd);
82         if (ret < 0)
83                 goto out;
84         ret = compute_afhi(fn, pard->map, pard->map_size,
85                 pard->fd, afhi);
86         if (ret < 0)
87                 goto out_unmap;
88         pard->audio_format_num = ret;
89         ret = -ERRNO_TO_PARA_ERROR(EINVAL);
90         msg = "no data chunks";
91         if (afhi->chunks_total == 0)
92                 goto out_clear_afhi;
93         msg = "invalid begin chunk";
94         if (PARA_ABS(bc) >= afhi->chunks_total)
95                 goto out_clear_afhi;
96         if (bc >= 0)
97                 pard->first_chunk = afh_get_start_chunk(bc, &pard->afhi,
98                         pard->audio_format_num);
99         else
100                 pard->first_chunk = afh_get_start_chunk(afhi->chunks_total + bc,
101                         &pard->afhi, pard->audio_format_num);
102         if (lls_opt_given(r_e)) {
103                 int32_t ec = lls_int32_val(0, r_e);
104                 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
105                 msg = "invalid end chunk";
106                 if (PARA_ABS(ec) > afhi->chunks_total)
107                         goto out_clear_afhi;
108                 if (ec >= 0)
109                         pard->last_chunk = ec;
110                 else
111                         pard->last_chunk = afhi->chunks_total + ec;
112         } else
113                 pard->last_chunk = afhi->chunks_total - 1;
114         ret = -ERRNO_TO_PARA_ERROR(EINVAL);
115         msg = "begin chunk >= end chunk!?";
116         if (pard->first_chunk >= pard->last_chunk)
117                 goto out_clear_afhi;
118         pard->current_chunk = pard->first_chunk;
119         return pard->audio_format_num;
120 out_clear_afhi:
121         clear_afhi(afhi);
122         PARA_ERROR_LOG("%s: %s\n", fn, msg);
123 out_unmap:
124         para_munmap(pard->map, pard->map_size);
125         close(pard->fd);
126 out:
127         freep(&rn->private_data);
128         return ret;
129 }
130
131 static void afh_recv_close(struct receiver_node *rn)
132 {
133         struct private_afh_recv_data *pard;
134
135         if (!rn || !rn->private_data)
136                 return;
137         pard = rn->private_data;
138         clear_afhi(&pard->afhi);
139         para_munmap(pard->map, pard->map_size);
140         close(pard->fd);
141         afh_close(pard->afh_context, pard->audio_format_num);
142         freep(&rn->private_data);
143 }
144
145 static void afh_recv_pre_select(struct sched *s, void *context)
146 {
147         struct receiver_node *rn = context;
148         struct private_afh_recv_data *pard = rn->private_data;
149         struct afh_info *afhi = &pard->afhi;
150         struct lls_parse_result *lpr = rn->lpr;
151         struct timeval chunk_time;
152         int state = generic_recv_pre_select(s, rn);
153         unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr);
154
155         if (state <= 0)
156                 return;
157         if (!j_given) {
158                 sched_min_delay(s);
159                 return;
160         }
161         compute_chunk_time(pard->current_chunk - pard->first_chunk,
162                 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
163         sched_request_barrier_or_min_delay(&chunk_time, s);
164 }
165
166 static int afh_recv_post_select(__a_unused struct sched *s, void *context)
167 {
168         struct receiver_node *rn = context;
169         struct lls_parse_result *lpr = rn->lpr;
170         struct private_afh_recv_data *pard = rn->private_data;
171         struct btr_node *btrn = rn->btrn;
172         struct afh_info *afhi = &pard->afhi;
173         int ret;
174         char *buf;
175         const char *start;
176         size_t size;
177         struct timeval chunk_time;
178         unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr);
179         unsigned H_given = RECV_CMD_OPT_GIVEN(AFH, NO_HEADER, lpr);
180
181         ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
182         if (ret <= 0)
183                 goto out;
184         if (pard->first_chunk > 0 && !H_given) {
185                 char *header;
186                 afh_get_header(afhi, pard->audio_format_num, pard->map,
187                         pard->map_size, &header, &size);
188                 if (size > 0) {
189                         PARA_INFO_LOG("writing header (%zu bytes)\n", size);
190                         buf = para_malloc(size);
191                         memcpy(buf, header, size);
192                         btr_add_output(buf, size, btrn);
193                         afh_free_header(header, pard->audio_format_num);
194                 }
195         }
196         if (!j_given) {
197                 long unsigned n;
198                 for (n = pard->first_chunk; n < pard->last_chunk; n++) {
199                         ret = afh_get_chunk(n, afhi, pard->audio_format_num,
200                                 pard->map, pard->map_size, &start, &size,
201                                 &pard->afh_context);
202                         if (ret < 0)
203                                 goto out;
204                         PARA_DEBUG_LOG("adding %zu bytes\n", size);
205                         btr_add_output_dont_free(start, size, btrn);
206                 }
207                 ret = -E_RECV_EOF;
208                 goto out;
209         }
210         if (pard->current_chunk == pard->first_chunk)
211                 pard->stream_start = *now;
212         else {
213                 compute_chunk_time(pard->current_chunk - pard->first_chunk,
214                         &afhi->chunk_tv, &pard->stream_start, &chunk_time);
215                 ret = tv_diff(&chunk_time, now, NULL);
216                 if (ret > 0)
217                         goto out;
218         }
219         ret = afh_get_chunk(pard->current_chunk, afhi,
220                 pard->audio_format_num, pard->map,
221                 pard->map_size, &start, &size,
222                 &pard->afh_context);
223         if (ret < 0)
224                 goto out;
225         PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk);
226         btr_add_output_dont_free(start, size, btrn);
227         if (pard->current_chunk >= pard->last_chunk) {
228                 ret = -E_RECV_EOF;
229                 goto out;
230         }
231         pard->current_chunk++;
232         ret = 1;
233 out:
234         if (ret < 0) {
235                 btr_remove_node(&rn->btrn);
236                 pard->current_chunk = pard->first_chunk;
237         }
238         return ret;
239 }
240
241 /** See \ref recv_init(). */
242 const struct receiver lsg_recv_cmd_com_afh_user_data = {
243         .init = afh_init,
244         .open = afh_recv_open,
245         .close = afh_recv_close,
246         .pre_select = afh_recv_pre_select,
247         .post_select = afh_recv_post_select,
248         .execute = afh_execute,
249 };