1 /* Copyright (C) 2011 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
3 /** \file afh_recv.c Receiver for streaming local files. */
9 #include "recv_cmd.lsg.h"
14 #include "buffer_tree.h"
20 struct private_afh_recv_data
{
26 long unsigned first_chunk
;
27 long unsigned last_chunk
;
28 struct timeval stream_start
;
29 uint32_t current_chunk
;
33 static int afh_execute(struct btr_node
*btrn
, const char *cmd
, char **result
)
35 struct receiver_node
*rn
= btr_context(btrn
);
36 struct private_afh_recv_data
*pard
= rn
->private_data
;
39 if (!strcmp(cmd
, "seconds_total")) {
40 *result
= make_message("%" PRIu32
, pard
->afhi
.seconds_total
);
43 if (!strcmp(cmd
, "chunks_total")) {
44 *result
= make_message("%" PRIu32
, pard
->afhi
.chunks_total
);
47 if (!strcmp(cmd
, "afhi")) {
48 afh_get_afhi_txt(pard
->audio_format_num
, &pard
->afhi
, result
);
51 if (!strncmp(cmd
, "repos", 5)) {
53 int ret
= para_atoi32(cmd
+ 5, &x
);
56 if (x
>= pard
->afhi
.chunks_total
)
57 return -ERRNO_TO_PARA_ERROR(EINVAL
);
58 pard
->first_chunk
= afh_get_start_chunk(x
, &pard
->afhi
,
59 pard
->audio_format_num
);
60 pard
->current_chunk
= pard
->first_chunk
;
66 static int afh_recv_open(struct receiver_node
*rn
)
68 struct lls_parse_result
*lpr
= rn
->lpr
;
69 struct private_afh_recv_data
*pard
;
70 struct afh_info
*afhi
;
71 const char *fn
= RECV_CMD_OPT_STRING_VAL(AFH
, FILENAME
, lpr
), *msg
;
72 int32_t bc
= RECV_CMD_OPT_INT32_VAL(AFH
, BEGIN_CHUNK
, lpr
);
73 const struct lls_opt_result
*r_e
= RECV_CMD_OPT_RESULT(AFH
, END_CHUNK
, lpr
);
76 if (!fn
|| *fn
== '\0')
77 return -E_AFH_RECV_BAD_FILENAME
;
78 rn
->private_data
= pard
= zalloc(sizeof(*pard
));
80 ret
= mmap_full_file(fn
, O_RDONLY
, &pard
->map
,
81 &pard
->map_size
, &pard
->fd
);
84 ret
= compute_afhi(fn
, pard
->map
, pard
->map_size
,
88 pard
->audio_format_num
= ret
;
89 ret
= -ERRNO_TO_PARA_ERROR(EINVAL
);
90 msg
= "no data chunks";
91 if (afhi
->chunks_total
== 0)
93 msg
= "invalid begin chunk";
94 if (PARA_ABS(bc
) >= afhi
->chunks_total
)
97 pard
->first_chunk
= afh_get_start_chunk(bc
, &pard
->afhi
,
98 pard
->audio_format_num
);
100 pard
->first_chunk
= afh_get_start_chunk(afhi
->chunks_total
+ bc
,
101 &pard
->afhi
, pard
->audio_format_num
);
102 if (lls_opt_given(r_e
)) {
103 int32_t ec
= lls_int32_val(0, r_e
);
104 ret
= -ERRNO_TO_PARA_ERROR(EINVAL
);
105 msg
= "invalid end chunk";
106 if (PARA_ABS(ec
) > afhi
->chunks_total
)
109 pard
->last_chunk
= ec
;
111 pard
->last_chunk
= afhi
->chunks_total
+ ec
;
113 pard
->last_chunk
= afhi
->chunks_total
- 1;
114 ret
= -ERRNO_TO_PARA_ERROR(EINVAL
);
115 msg
= "begin chunk >= end chunk!?";
116 if (pard
->first_chunk
>= pard
->last_chunk
)
118 pard
->current_chunk
= pard
->first_chunk
;
119 return pard
->audio_format_num
;
122 PARA_ERROR_LOG("%s: %s\n", fn
, msg
);
124 para_munmap(pard
->map
, pard
->map_size
);
127 freep(&rn
->private_data
);
131 static void afh_recv_close(struct receiver_node
*rn
)
133 struct private_afh_recv_data
*pard
;
135 if (!rn
|| !rn
->private_data
)
137 pard
= rn
->private_data
;
138 clear_afhi(&pard
->afhi
);
139 para_munmap(pard
->map
, pard
->map_size
);
141 afh_close(pard
->afh_context
, pard
->audio_format_num
);
142 freep(&rn
->private_data
);
145 static void afh_recv_pre_monitor(struct sched
*s
, void *context
)
147 struct receiver_node
*rn
= context
;
148 struct private_afh_recv_data
*pard
= rn
->private_data
;
149 struct afh_info
*afhi
= &pard
->afhi
;
150 struct lls_parse_result
*lpr
= rn
->lpr
;
151 struct timeval chunk_time
;
152 int state
= generic_recv_pre_monitor(s
, rn
);
153 unsigned j_given
= RECV_CMD_OPT_GIVEN(AFH
, JUST_IN_TIME
, lpr
);
161 compute_chunk_time(pard
->current_chunk
- pard
->first_chunk
,
162 &afhi
->chunk_tv
, &pard
->stream_start
, &chunk_time
);
163 sched_request_barrier_or_min_delay(&chunk_time
, s
);
166 static int afh_recv_post_monitor(__a_unused
struct sched
*s
, void *context
)
168 struct receiver_node
*rn
= context
;
169 struct lls_parse_result
*lpr
= rn
->lpr
;
170 struct private_afh_recv_data
*pard
= rn
->private_data
;
171 struct btr_node
*btrn
= rn
->btrn
;
172 struct afh_info
*afhi
= &pard
->afhi
;
178 struct timeval chunk_time
;
179 unsigned j_given
= RECV_CMD_OPT_GIVEN(AFH
, JUST_IN_TIME
, lpr
);
180 unsigned H_given
= RECV_CMD_OPT_GIVEN(AFH
, NO_HEADER
, lpr
);
182 ret
= btr_node_status(btrn
, 0, BTR_NT_ROOT
);
185 if (pard
->first_chunk
> 0 && !H_given
) {
187 afh_get_header(afhi
, pard
->audio_format_num
, pard
->map
,
188 pard
->map_size
, &header
, &size
);
190 PARA_INFO_LOG("writing header (%zu bytes)\n", size
);
192 memcpy(buf
, header
, size
);
193 btr_add_output(buf
, size
, btrn
);
194 afh_free_header(header
, pard
->audio_format_num
);
199 for (n
= pard
->first_chunk
; n
< pard
->last_chunk
; n
++) {
200 ret
= afh_get_chunk(n
, afhi
, pard
->audio_format_num
,
201 pard
->map
, pard
->map_size
, &start
, &len
,
205 PARA_DEBUG_LOG("adding %u bytes\n", len
);
206 btr_add_output_dont_free(start
, len
, btrn
);
211 if (pard
->current_chunk
== pard
->first_chunk
)
212 pard
->stream_start
= *now
;
214 compute_chunk_time(pard
->current_chunk
- pard
->first_chunk
,
215 &afhi
->chunk_tv
, &pard
->stream_start
, &chunk_time
);
216 ret
= tv_diff(&chunk_time
, now
, NULL
);
220 ret
= afh_get_chunk(pard
->current_chunk
, afhi
,
221 pard
->audio_format_num
, pard
->map
,
222 pard
->map_size
, &start
, &len
,
226 PARA_DEBUG_LOG("adding chunk %u\n", pard
->current_chunk
);
227 btr_add_output_dont_free(start
, len
, btrn
);
228 if (pard
->current_chunk
>= pard
->last_chunk
) {
232 pard
->current_chunk
++;
236 btr_remove_node(&rn
->btrn
);
237 pard
->current_chunk
= pard
->first_chunk
;
242 const struct receiver lsg_recv_cmd_com_afh_user_data
= {
243 .open
= afh_recv_open
,
244 .close
= afh_recv_close
,
245 .pre_monitor
= afh_recv_pre_monitor
,
246 .post_monitor
= afh_recv_post_monitor
,
247 .execute
= afh_execute
,