Merge branch 'refs/heads/t/doc'
[paraslash.git] / afh_recv.c
index 550ca424376ad2108557ef17159265e4667ad3eb..6525209bff4119a177eb443f154b4f1f5224845b 100644 (file)
+/* Copyright (C) 2011 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
+
+/** \file afh_recv.c Receiver for streaming local files. */
+
 #include <regex.h>
 #include <sys/types.h>
-#include <stdbool.h>
+#include <lopsub.h>
 
+#include "recv_cmd.lsg.h"
 #include "para.h"
 #include "error.h"
 #include "list.h"
 #include "sched.h"
-#include "ggo.h"
 #include "buffer_tree.h"
 #include "recv.h"
-#include "afh_recv.cmdline.h"
 #include "string.h"
 #include "fd.h"
+#include "afh.h"
+
+struct private_afh_recv_data {
+       int fd;
+       void *map;
+       size_t map_size;
+       struct afh_info afhi;
+       int audio_format_num;
+       long unsigned first_chunk;
+       long unsigned last_chunk;
+       struct timeval stream_start;
+       uint32_t current_chunk;
+       void *afh_context;
+};
 
-/**
- * The init function of the afh receiver.
- *
- * \param r Pointer to the receiver struct to initialize.
- *
- * This initializes all function pointers of \a r.
- */
-void afh_recv_init(struct receiver *r)
+static int afh_execute(struct btr_node *btrn, const char *cmd, char **result)
 {
-       struct afh_recv_args_info dummy;
-
-       afh_recv_cmdline_parser_init(&dummy);
-#if 0
-       r->open = afh_recv_open;
-       r->close = afh_recv_close;
-       r->pre_select = afh_recv_pre_select;
-       r->post_select = afh_recv_post_select;
-       r->parse_config = afh_recv_parse_config;
-       r->free_config = afh_recv_free_config;
-#endif
-       r->help = (struct ggo_help) {
-               .short_help = afh_recv_args_info_help,
-               .detailed_help = afh_recv_args_info_detailed_help
-       };
-       afh_recv_cmdline_parser_free(&dummy);
+       struct receiver_node *rn = btr_context(btrn);
+       struct private_afh_recv_data *pard = rn->private_data;
+
+       *result = NULL;
+       if (!strcmp(cmd, "seconds_total")) {
+               *result = make_message("%" PRIu32, pard->afhi.seconds_total);
+               return 1;
+       }
+       if (!strcmp(cmd, "chunks_total")) {
+               *result = make_message("%" PRIu32, pard->afhi.chunks_total);
+               return 1;
+       }
+       if (!strcmp(cmd, "afhi")) {
+               afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result);
+               return 1;
+       }
+       if (!strncmp(cmd, "repos", 5)) {
+               int32_t x;
+               int ret = para_atoi32(cmd + 5, &x);
+               if (ret < 0)
+                       return ret;
+               if (x >= pard->afhi.chunks_total)
+                       return -ERRNO_TO_PARA_ERROR(EINVAL);
+               pard->first_chunk = afh_get_start_chunk(x, &pard->afhi,
+                       pard->audio_format_num);
+               pard->current_chunk = pard->first_chunk;
+               return 1;
+       }
+       return -E_BTR_NAVAIL;
 }
+
+static int afh_recv_open(struct receiver_node *rn)
+{
+       struct lls_parse_result *lpr = rn->lpr;
+       struct private_afh_recv_data *pard;
+       struct afh_info *afhi;
+       const char *fn = RECV_CMD_OPT_STRING_VAL(AFH, FILENAME, lpr), *msg;
+       int32_t bc = RECV_CMD_OPT_INT32_VAL(AFH, BEGIN_CHUNK, lpr);
+       const struct lls_opt_result *r_e = RECV_CMD_OPT_RESULT(AFH, END_CHUNK, lpr);
+       int ret;
+
+       if (!fn || *fn == '\0')
+               return -E_AFH_RECV_BAD_FILENAME;
+       rn->private_data = pard = para_calloc(sizeof(*pard));
+       afhi = &pard->afhi;
+       ret = mmap_full_file(fn, O_RDONLY, &pard->map,
+               &pard->map_size, &pard->fd);
+       if (ret < 0)
+               goto out;
+       ret = compute_afhi(fn, pard->map, pard->map_size,
+               pard->fd, afhi);
+       if (ret < 0)
+               goto out_unmap;
+       pard->audio_format_num = ret;
+       ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+       msg = "no data chunks";
+       if (afhi->chunks_total == 0)
+               goto out_clear_afhi;
+       msg = "invalid begin chunk";
+       if (PARA_ABS(bc) >= afhi->chunks_total)
+               goto out_clear_afhi;
+       if (bc >= 0)
+               pard->first_chunk = afh_get_start_chunk(bc, &pard->afhi,
+                       pard->audio_format_num);
+       else
+               pard->first_chunk = afh_get_start_chunk(afhi->chunks_total + bc,
+                       &pard->afhi, pard->audio_format_num);
+       if (lls_opt_given(r_e)) {
+               int32_t ec = lls_int32_val(0, r_e);
+               ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+               msg = "invalid end chunk";
+               if (PARA_ABS(ec) > afhi->chunks_total)
+                       goto out_clear_afhi;
+               if (ec >= 0)
+                       pard->last_chunk = ec;
+               else
+                       pard->last_chunk = afhi->chunks_total + ec;
+       } else
+               pard->last_chunk = afhi->chunks_total - 1;
+       ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+       msg = "begin chunk >= end chunk!?";
+       if (pard->first_chunk >= pard->last_chunk)
+               goto out_clear_afhi;
+       pard->current_chunk = pard->first_chunk;
+       return pard->audio_format_num;
+out_clear_afhi:
+       clear_afhi(afhi);
+       PARA_ERROR_LOG("%s: %s\n", fn, msg);
+out_unmap:
+       para_munmap(pard->map, pard->map_size);
+       close(pard->fd);
+out:
+       freep(&rn->private_data);
+       return ret;
+}
+
+static void afh_recv_close(struct receiver_node *rn)
+{
+       struct private_afh_recv_data *pard;
+
+       if (!rn || !rn->private_data)
+               return;
+       pard = rn->private_data;
+       clear_afhi(&pard->afhi);
+       para_munmap(pard->map, pard->map_size);
+       close(pard->fd);
+       afh_close(pard->afh_context, pard->audio_format_num);
+       freep(&rn->private_data);
+}
+
+static void afh_recv_pre_select(struct sched *s, void *context)
+{
+       struct receiver_node *rn = context;
+       struct private_afh_recv_data *pard = rn->private_data;
+       struct afh_info *afhi = &pard->afhi;
+       struct lls_parse_result *lpr = rn->lpr;
+       struct timeval chunk_time;
+       int state = generic_recv_pre_select(s, rn);
+       unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr);
+
+       if (state <= 0)
+               return;
+       if (!j_given) {
+               sched_min_delay(s);
+               return;
+       }
+       compute_chunk_time(pard->current_chunk - pard->first_chunk,
+               &afhi->chunk_tv, &pard->stream_start, &chunk_time);
+       sched_request_barrier_or_min_delay(&chunk_time, s);
+}
+
+static int afh_recv_post_select(__a_unused struct sched *s, void *context)
+{
+       struct receiver_node *rn = context;
+       struct lls_parse_result *lpr = rn->lpr;
+       struct private_afh_recv_data *pard = rn->private_data;
+       struct btr_node *btrn = rn->btrn;
+       struct afh_info *afhi = &pard->afhi;
+       int ret;
+       char *buf;
+       const char *start;
+       size_t size;
+       struct timeval chunk_time;
+       unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr);
+       unsigned H_given = RECV_CMD_OPT_GIVEN(AFH, NO_HEADER, lpr);
+
+       ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
+       if (ret <= 0)
+               goto out;
+       if (pard->first_chunk > 0 && !H_given) {
+               char *header;
+               afh_get_header(afhi, pard->audio_format_num, pard->map,
+                       pard->map_size, &header, &size);
+               if (size > 0) {
+                       PARA_INFO_LOG("writing header (%zu bytes)\n", size);
+                       buf = para_malloc(size);
+                       memcpy(buf, header, size);
+                       btr_add_output(buf, size, btrn);
+                       afh_free_header(header, pard->audio_format_num);
+               }
+       }
+       if (!j_given) {
+               long unsigned n;
+               for (n = pard->first_chunk; n < pard->last_chunk; n++) {
+                       ret = afh_get_chunk(n, afhi, pard->audio_format_num,
+                               pard->map, pard->map_size, &start, &size,
+                               &pard->afh_context);
+                       if (ret < 0)
+                               goto out;
+                       PARA_DEBUG_LOG("adding %zu bytes\n", size);
+                       btr_add_output_dont_free(start, size, btrn);
+               }
+               ret = -E_RECV_EOF;
+               goto out;
+       }
+       if (pard->current_chunk == pard->first_chunk)
+               pard->stream_start = *now;
+       else {
+               compute_chunk_time(pard->current_chunk - pard->first_chunk,
+                       &afhi->chunk_tv, &pard->stream_start, &chunk_time);
+               ret = tv_diff(&chunk_time, now, NULL);
+               if (ret > 0)
+                       goto out;
+       }
+       ret = afh_get_chunk(pard->current_chunk, afhi,
+               pard->audio_format_num, pard->map,
+               pard->map_size, &start, &size,
+               &pard->afh_context);
+       if (ret < 0)
+               goto out;
+       PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk);
+       btr_add_output_dont_free(start, size, btrn);
+       if (pard->current_chunk >= pard->last_chunk) {
+               ret = -E_RECV_EOF;
+               goto out;
+       }
+       pard->current_chunk++;
+       ret = 1;
+out:
+       if (ret < 0) {
+               btr_remove_node(&rn->btrn);
+               pard->current_chunk = pard->first_chunk;
+       }
+       return ret;
+}
+
+/** See \ref recv_init(). */
+const struct receiver lsg_recv_cmd_com_afh_user_data = {
+       .init = afh_init,
+       .open = afh_recv_open,
+       .close = afh_recv_close,
+       .pre_select = afh_recv_pre_select,
+       .post_select = afh_recv_post_select,
+       .execute = afh_execute,
+};