afs: Document ->lpr of struct afs_callback_arg.
[paraslash.git] / afh_recv.c
1 /* Copyright (C) 2011 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
2
3 /** \file afh_recv.c Receiver for streaming local files. */
4
5 #include <regex.h>
6 #include <sys/types.h>
7 #include <lopsub.h>
8
9 #include "recv_cmd.lsg.h"
10 #include "para.h"
11 #include "error.h"
12 #include "list.h"
13 #include "sched.h"
14 #include "buffer_tree.h"
15 #include "recv.h"
16 #include "string.h"
17 #include "fd.h"
18 #include "afh.h"
19
20 struct private_afh_recv_data {
21 int fd;
22 void *map;
23 size_t map_size;
24 struct afh_info afhi;
25 int audio_format_num;
26 long unsigned first_chunk;
27 long unsigned last_chunk;
28 struct timeval stream_start;
29 uint32_t current_chunk;
30 void *afh_context;
31 };
32
33 static int afh_execute(struct btr_node *btrn, const char *cmd, char **result)
34 {
35 struct receiver_node *rn = btr_context(btrn);
36 struct private_afh_recv_data *pard = rn->private_data;
37
38 *result = NULL;
39 if (!strcmp(cmd, "seconds_total")) {
40 *result = make_message("%" PRIu32, pard->afhi.seconds_total);
41 return 1;
42 }
43 if (!strcmp(cmd, "chunks_total")) {
44 *result = make_message("%" PRIu32, pard->afhi.chunks_total);
45 return 1;
46 }
47 if (!strcmp(cmd, "afhi")) {
48 afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result);
49 return 1;
50 }
51 if (!strncmp(cmd, "repos", 5)) {
52 int32_t x;
53 int ret = para_atoi32(cmd + 5, &x);
54 if (ret < 0)
55 return ret;
56 if (x >= pard->afhi.chunks_total)
57 return -ERRNO_TO_PARA_ERROR(EINVAL);
58 pard->first_chunk = afh_get_start_chunk(x, &pard->afhi,
59 pard->audio_format_num);
60 pard->current_chunk = pard->first_chunk;
61 return 1;
62 }
63 return -E_BTR_NAVAIL;
64 }
65
66 static int afh_recv_open(struct receiver_node *rn)
67 {
68 struct lls_parse_result *lpr = rn->lpr;
69 struct private_afh_recv_data *pard;
70 struct afh_info *afhi;
71 const char *fn = RECV_CMD_OPT_STRING_VAL(AFH, FILENAME, lpr);
72 int32_t bc = RECV_CMD_OPT_INT32_VAL(AFH, BEGIN_CHUNK, lpr);
73 const struct lls_opt_result *r_e = RECV_CMD_OPT_RESULT(AFH, END_CHUNK, lpr);
74 int ret;
75
76 if (!fn || *fn == '\0')
77 return -E_AFH_RECV_BAD_FILENAME;
78 rn->private_data = pard = para_calloc(sizeof(*pard));
79 afhi = &pard->afhi;
80 ret = mmap_full_file(fn, O_RDONLY, &pard->map,
81 &pard->map_size, &pard->fd);
82 if (ret < 0)
83 goto out;
84 ret = compute_afhi(fn, pard->map, pard->map_size,
85 pard->fd, afhi);
86 if (ret < 0)
87 goto out_unmap;
88 pard->audio_format_num = ret;
89 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
90 if (afhi->chunks_total == 0)
91 goto out_clear_afhi;
92 if (PARA_ABS(bc) >= afhi->chunks_total)
93 goto out_clear_afhi;
94 if (bc >= 0)
95 pard->first_chunk = afh_get_start_chunk(bc, &pard->afhi,
96 pard->audio_format_num);
97 else
98 pard->first_chunk = afh_get_start_chunk(afhi->chunks_total + bc,
99 &pard->afhi, pard->audio_format_num);
100 if (lls_opt_given(r_e)) {
101 int32_t ec = lls_int32_val(0, r_e);
102 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
103 if (PARA_ABS(ec) > afhi->chunks_total)
104 goto out_clear_afhi;
105 if (ec >= 0)
106 pard->last_chunk = ec;
107 else
108 pard->last_chunk = afhi->chunks_total + ec;
109 } else
110 pard->last_chunk = afhi->chunks_total - 1;
111 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
112 if (pard->first_chunk >= pard->last_chunk)
113 goto out_clear_afhi;
114 pard->current_chunk = pard->first_chunk;
115 return pard->audio_format_num;
116 out_clear_afhi:
117 clear_afhi(afhi);
118 out_unmap:
119 para_munmap(pard->map, pard->map_size);
120 close(pard->fd);
121 out:
122 freep(&rn->private_data);
123 return ret;
124 }
125
126 static void afh_recv_close(struct receiver_node *rn)
127 {
128 struct private_afh_recv_data *pard;
129
130 if (!rn || !rn->private_data)
131 return;
132 pard = rn->private_data;
133 clear_afhi(&pard->afhi);
134 para_munmap(pard->map, pard->map_size);
135 close(pard->fd);
136 afh_close(pard->afh_context, pard->audio_format_num);
137 freep(&rn->private_data);
138 }
139
140 static void afh_recv_pre_select(struct sched *s, void *context)
141 {
142 struct receiver_node *rn = context;
143 struct private_afh_recv_data *pard = rn->private_data;
144 struct afh_info *afhi = &pard->afhi;
145 struct lls_parse_result *lpr = rn->lpr;
146 struct timeval chunk_time;
147 int state = generic_recv_pre_select(s, rn);
148 unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr);
149
150 if (state <= 0)
151 return;
152 if (!j_given) {
153 sched_min_delay(s);
154 return;
155 }
156 compute_chunk_time(pard->current_chunk - pard->first_chunk,
157 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
158 sched_request_barrier_or_min_delay(&chunk_time, s);
159 }
160
161 static int afh_recv_post_select(__a_unused struct sched *s, void *context)
162 {
163 struct receiver_node *rn = context;
164 struct lls_parse_result *lpr = rn->lpr;
165 struct private_afh_recv_data *pard = rn->private_data;
166 struct btr_node *btrn = rn->btrn;
167 struct afh_info *afhi = &pard->afhi;
168 int ret;
169 char *buf;
170 const char *start;
171 size_t size;
172 struct timeval chunk_time;
173 unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr);
174 unsigned H_given = RECV_CMD_OPT_GIVEN(AFH, NO_HEADER, lpr);
175
176 ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
177 if (ret <= 0)
178 goto out;
179 if (pard->first_chunk > 0 && !H_given) {
180 char *header;
181 afh_get_header(afhi, pard->audio_format_num, pard->map,
182 pard->map_size, &header, &size);
183 if (size > 0) {
184 PARA_INFO_LOG("writing header (%zu bytes)\n", size);
185 buf = para_malloc(size);
186 memcpy(buf, header, size);
187 btr_add_output(buf, size, btrn);
188 afh_free_header(header, pard->audio_format_num);
189 }
190 }
191 if (!j_given) {
192 long unsigned n;
193 for (n = pard->first_chunk; n < pard->last_chunk; n++) {
194 ret = afh_get_chunk(n, afhi, pard->audio_format_num,
195 pard->map, pard->map_size, &start, &size,
196 &pard->afh_context);
197 if (ret < 0)
198 goto out;
199 PARA_DEBUG_LOG("adding %zu bytes\n", size);
200 btr_add_output_dont_free(start, size, btrn);
201 }
202 ret = -E_RECV_EOF;
203 goto out;
204 }
205 if (pard->current_chunk == pard->first_chunk)
206 pard->stream_start = *now;
207 else {
208 compute_chunk_time(pard->current_chunk - pard->first_chunk,
209 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
210 ret = tv_diff(&chunk_time, now, NULL);
211 if (ret > 0)
212 goto out;
213 }
214 ret = afh_get_chunk(pard->current_chunk, afhi,
215 pard->audio_format_num, pard->map,
216 pard->map_size, &start, &size,
217 &pard->afh_context);
218 if (ret < 0)
219 goto out;
220 PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk);
221 btr_add_output_dont_free(start, size, btrn);
222 if (pard->current_chunk >= pard->last_chunk) {
223 ret = -E_RECV_EOF;
224 goto out;
225 }
226 pard->current_chunk++;
227 ret = 1;
228 out:
229 if (ret < 0) {
230 btr_remove_node(&rn->btrn);
231 pard->current_chunk = pard->first_chunk;
232 }
233 return ret;
234 }
235
236 /** See \ref recv_init(). */
237 const struct receiver lsg_recv_cmd_com_afh_user_data = {
238 .init = afh_init,
239 .open = afh_recv_open,
240 .close = afh_recv_close,
241 .pre_select = afh_recv_pre_select,
242 .post_select = afh_recv_post_select,
243 .execute = afh_execute,
244 };