1 /* Copyright (C) 2011 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
3 /** \file afh_recv.c Receiver for streaming local files. */
9 #include "recv_cmd.lsg.h"
14 #include "buffer_tree.h"
20 struct private_afh_recv_data {
26 long unsigned first_chunk;
27 long unsigned last_chunk;
28 struct timeval stream_start;
29 uint32_t current_chunk;
33 static int afh_execute(struct btr_node *btrn, const char *cmd, char **result)
35 struct receiver_node *rn = btr_context(btrn);
36 struct private_afh_recv_data *pard = rn->private_data;
39 if (!strcmp(cmd, "seconds_total")) {
40 *result = make_message("%" PRIu32, pard->afhi.seconds_total);
43 if (!strcmp(cmd, "chunks_total")) {
44 *result = make_message("%" PRIu32, pard->afhi.chunks_total);
47 if (!strcmp(cmd, "afhi")) {
48 afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result);
51 if (!strncmp(cmd, "repos", 5)) {
53 int ret = para_atoi32(cmd + 5, &x);
56 if (x >= pard->afhi.chunks_total)
57 return -ERRNO_TO_PARA_ERROR(EINVAL);
58 pard->first_chunk = afh_get_start_chunk(x, &pard->afhi,
59 pard->audio_format_num);
60 pard->current_chunk = pard->first_chunk;
66 static int afh_recv_open(struct receiver_node *rn)
68 struct lls_parse_result *lpr = rn->lpr;
69 struct private_afh_recv_data *pard;
70 struct afh_info *afhi;
71 const char *fn = RECV_CMD_OPT_STRING_VAL(AFH, FILENAME, lpr), *msg;
72 int32_t bc = RECV_CMD_OPT_INT32_VAL(AFH, BEGIN_CHUNK, lpr);
73 const struct lls_opt_result *r_e = RECV_CMD_OPT_RESULT(AFH, END_CHUNK, lpr);
76 if (!fn || *fn == '\0')
77 return -E_AFH_RECV_BAD_FILENAME;
78 rn->private_data = pard = para_calloc(sizeof(*pard));
80 ret = mmap_full_file(fn, O_RDONLY, &pard->map,
81 &pard->map_size, &pard->fd);
84 ret = compute_afhi(fn, pard->map, pard->map_size,
88 pard->audio_format_num = ret;
89 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
90 msg = "no data chunks";
91 if (afhi->chunks_total == 0)
93 msg = "invalid begin chunk";
94 if (PARA_ABS(bc) >= afhi->chunks_total)
97 pard->first_chunk = afh_get_start_chunk(bc, &pard->afhi,
98 pard->audio_format_num);
100 pard->first_chunk = afh_get_start_chunk(afhi->chunks_total + bc,
101 &pard->afhi, pard->audio_format_num);
102 if (lls_opt_given(r_e)) {
103 int32_t ec = lls_int32_val(0, r_e);
104 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
105 msg = "invalid end chunk";
106 if (PARA_ABS(ec) > afhi->chunks_total)
109 pard->last_chunk = ec;
111 pard->last_chunk = afhi->chunks_total + ec;
113 pard->last_chunk = afhi->chunks_total - 1;
114 ret = -ERRNO_TO_PARA_ERROR(EINVAL);
115 msg = "begin chunk >= end chunk!?";
116 if (pard->first_chunk >= pard->last_chunk)
118 pard->current_chunk = pard->first_chunk;
119 return pard->audio_format_num;
122 PARA_ERROR_LOG("%s: %s\n", fn, msg);
124 para_munmap(pard->map, pard->map_size);
127 freep(&rn->private_data);
131 static void afh_recv_close(struct receiver_node *rn)
133 struct private_afh_recv_data *pard;
135 if (!rn || !rn->private_data)
137 pard = rn->private_data;
138 clear_afhi(&pard->afhi);
139 para_munmap(pard->map, pard->map_size);
141 afh_close(pard->afh_context, pard->audio_format_num);
142 freep(&rn->private_data);
145 static void afh_recv_pre_select(struct sched *s, void *context)
147 struct receiver_node *rn = context;
148 struct private_afh_recv_data *pard = rn->private_data;
149 struct afh_info *afhi = &pard->afhi;
150 struct lls_parse_result *lpr = rn->lpr;
151 struct timeval chunk_time;
152 int state = generic_recv_pre_select(s, rn);
153 unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr);
161 compute_chunk_time(pard->current_chunk - pard->first_chunk,
162 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
163 sched_request_barrier_or_min_delay(&chunk_time, s);
166 static int afh_recv_post_select(__a_unused struct sched *s, void *context)
168 struct receiver_node *rn = context;
169 struct lls_parse_result *lpr = rn->lpr;
170 struct private_afh_recv_data *pard = rn->private_data;
171 struct btr_node *btrn = rn->btrn;
172 struct afh_info *afhi = &pard->afhi;
177 struct timeval chunk_time;
178 unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr);
179 unsigned H_given = RECV_CMD_OPT_GIVEN(AFH, NO_HEADER, lpr);
181 ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
184 if (pard->first_chunk > 0 && !H_given) {
186 afh_get_header(afhi, pard->audio_format_num, pard->map,
187 pard->map_size, &header, &size);
189 PARA_INFO_LOG("writing header (%zu bytes)\n", size);
190 buf = para_malloc(size);
191 memcpy(buf, header, size);
192 btr_add_output(buf, size, btrn);
193 afh_free_header(header, pard->audio_format_num);
198 for (n = pard->first_chunk; n < pard->last_chunk; n++) {
199 ret = afh_get_chunk(n, afhi, pard->audio_format_num,
200 pard->map, pard->map_size, &start, &size,
204 PARA_DEBUG_LOG("adding %zu bytes\n", size);
205 btr_add_output_dont_free(start, size, btrn);
210 if (pard->current_chunk == pard->first_chunk)
211 pard->stream_start = *now;
213 compute_chunk_time(pard->current_chunk - pard->first_chunk,
214 &afhi->chunk_tv, &pard->stream_start, &chunk_time);
215 ret = tv_diff(&chunk_time, now, NULL);
219 ret = afh_get_chunk(pard->current_chunk, afhi,
220 pard->audio_format_num, pard->map,
221 pard->map_size, &start, &size,
225 PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk);
226 btr_add_output_dont_free(start, size, btrn);
227 if (pard->current_chunk >= pard->last_chunk) {
231 pard->current_chunk++;
235 btr_remove_node(&rn->btrn);
236 pard->current_chunk = pard->first_chunk;
241 const struct receiver lsg_recv_cmd_com_afh_user_data = {
242 .open = afh_recv_open,
243 .close = afh_recv_close,
244 .pre_select = afh_recv_pre_select,
245 .post_select = afh_recv_post_select,
246 .execute = afh_execute,