]> git.tuebingen.mpg.de Git - paraslash.git/commitdiff
The afh receiver, implementation.
authorAndre Noll <maan@systemlinux.org>
Sun, 29 Jul 2012 12:41:11 +0000 (14:41 +0200)
committerAndre Noll <maan@systemlinux.org>
Sun, 18 Nov 2012 19:28:28 +0000 (20:28 +0100)
This replaces the dummy implementation added in the previous patch
by a working one. It employs the recently added ->execute method for
receivers to let other buffer tree nodes query information about the
current audio file and reposition the stream.

afh_recv.c
error.h

index 550ca424376ad2108557ef17159265e4667ad3eb..d042398bef6f30d4a1a385f125d94efff0e172d1 100644 (file)
@@ -1,3 +1,11 @@
+/*
+ * Copyright (C) 2011-2012 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file afh_recv.c Receiver for streaming local files. */
+
 #include <regex.h>
 #include <sys/types.h>
 #include <stdbool.h>
 #include <regex.h>
 #include <sys/types.h>
 #include <stdbool.h>
 #include "afh_recv.cmdline.h"
 #include "string.h"
 #include "fd.h"
 #include "afh_recv.cmdline.h"
 #include "string.h"
 #include "fd.h"
+#include "afh.h"
+
+struct private_afh_recv_data {
+       int fd;
+       void *map;
+       size_t map_size;
+       struct afh_info afhi;
+       int audio_format_num;
+       long unsigned first_chunk;
+       long unsigned last_chunk;
+       struct timeval stream_start;
+       uint32_t current_chunk;
+};
+
+static int afh_execute(struct btr_node *btrn, const char *cmd, char **result)
+{
+       struct receiver_node *rn = btr_context(btrn);
+       struct private_afh_recv_data *pard = rn->private_data;
+
+       *result = NULL;
+       if (!strcmp(cmd, "seconds_total")) {
+               *result = make_message("%lu", pard->afhi.seconds_total);
+               return 1;
+       }
+       if (!strcmp(cmd, "chunks_total")) {
+               *result = make_message("%lu", pard->afhi.chunks_total);
+               return 1;
+       }
+       if (!strcmp(cmd, "afhi")) {
+               afh_get_afhi_txt(pard->audio_format_num, &pard->afhi, result);
+               return 1;
+       }
+       if (!strncmp(cmd, "repos", 5)) {
+               int32_t x;
+               int ret = para_atoi32(cmd + 5, &x);
+               if (ret < 0)
+                       return ret;
+               if (x >= pard->afhi.chunks_total)
+                       return -ERRNO_TO_PARA_ERROR(EINVAL);
+               pard->first_chunk = pard->current_chunk = x;
+               rn->task.error = 0;
+               return 1;
+       }
+       return -E_BTR_NAVAIL;
+}
+
+static void *afh_recv_parse_config(int argc, char **argv)
+{
+       struct afh_recv_args_info *tmp = para_calloc(sizeof(*tmp));
+
+       if (!afh_recv_cmdline_parser(argc, argv, tmp))
+               return tmp;
+       free(tmp);
+       return NULL;
+}
+
+static void afh_recv_free_config(void *conf)
+{
+       if (!conf)
+               return;
+       afh_recv_cmdline_parser_free(conf);
+       free(conf);
+}
+
+static int afh_recv_open(struct receiver_node *rn)
+{
+       struct afh_recv_args_info *conf = rn->conf;
+       struct private_afh_recv_data *pard;
+       struct afh_info *afhi;
+       char *filename = conf->filename_arg;
+
+       int ret;
+
+       if (!filename || *filename == '\0')
+               return -E_AFH_RECV_BAD_FILENAME;
+       rn->private_data = pard = para_calloc(sizeof(*pard));
+       afhi = &pard->afhi;
+       ret = mmap_full_file(filename, O_RDONLY, &pard->map,
+               &pard->map_size, &pard->fd);
+       if (ret < 0)
+               goto out;
+       ret = compute_afhi(filename, pard->map, pard->map_size,
+               pard->fd, afhi);
+       if (ret < 0)
+               goto out_unmap;
+       pard->audio_format_num = ret;
+       ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+       if (afhi->chunks_total == 0)
+               goto out_clear_afhi;
+       if (PARA_ABS(conf->begin_chunk_arg) >= afhi->chunks_total)
+               goto out_clear_afhi;
+       if (conf->begin_chunk_arg >= 0)
+               pard->first_chunk = conf->begin_chunk_arg;
+       else
+               pard->first_chunk = afhi->chunks_total + conf->begin_chunk_arg;
+       if (conf->end_chunk_given) {
+               ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+               if (PARA_ABS(conf->end_chunk_arg) > afhi->chunks_total)
+                       goto out_clear_afhi;
+               if (conf->end_chunk_arg >= 0)
+                       pard->last_chunk = conf->end_chunk_arg;
+               else
+                       pard->last_chunk = afhi->chunks_total + conf->end_chunk_arg;
+       } else
+               pard->last_chunk = afhi->chunks_total - 1;
+       ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+       if (pard->first_chunk >= pard->last_chunk)
+               goto out_clear_afhi;
+       pard->current_chunk = pard->first_chunk;
+       return pard->audio_format_num;
+out_clear_afhi:
+       clear_afhi(afhi);
+out_unmap:
+       para_munmap(pard->map, pard->map_size);
+       close(pard->fd);
+out:
+       freep(&rn->private_data);
+       return ret;
+}
+
+static void afh_recv_close(struct receiver_node *rn)
+{
+       struct private_afh_recv_data *pard;
+
+       if (!rn || !rn->private_data)
+               return;
+       pard = rn->private_data;
+       clear_afhi(&pard->afhi);
+       para_munmap(pard->map, pard->map_size);
+       close(pard->fd);
+       freep(&rn->private_data);
+}
+
+static void afh_recv_pre_select(struct sched *s, struct task *t)
+{
+       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct private_afh_recv_data *pard = rn->private_data;
+       struct afh_info *afhi = &pard->afhi;
+       struct afh_recv_args_info *conf = rn->conf;
+       struct timeval chunk_time;
+       int state = generic_recv_pre_select(s, t);
+
+       if (state <= 0)
+               return;
+       if (!conf->just_in_time_given) {
+               sched_min_delay(s);
+               return;
+       }
+       compute_chunk_time(pard->current_chunk - pard->first_chunk,
+               &afhi->chunk_tv, &pard->stream_start, &chunk_time);
+       sched_request_barrier_or_min_delay(&chunk_time, s);
+}
+
+static void afh_recv_post_select(__a_unused struct sched *s, struct task *t)
+{
+       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct afh_recv_args_info *conf = rn->conf;
+       struct private_afh_recv_data *pard = rn->private_data;
+       struct btr_node *btrn = rn->btrn;
+       struct afh_info *afhi = &pard->afhi;
+       int ret;
+       char *buf;
+       const char *start, *end;
+       size_t size;
+       struct timeval chunk_time;
+
+       ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
+       if (ret <= 0)
+               goto out;
+       if (pard->first_chunk > 0 && !conf->no_header_given) {
+               char *header;
+               afh_get_header(afhi, pard->audio_format_num, pard->map,
+                       pard->map_size, &header, &size);
+               if (size > 0) {
+                       PARA_INFO_LOG("writing header (%zu bytes)\n", size);
+                       buf = para_malloc(size);
+                       memcpy(buf, header, size);
+                       btr_add_output(buf, size, btrn);
+                       afh_free_header(header, pard->audio_format_num);
+               }
+       }
+       if (!conf->just_in_time_given) {
+               afh_get_chunk(pard->first_chunk, afhi, pard->map, &start, &size);
+               afh_get_chunk(pard->last_chunk, afhi, pard->map, &end, &size);
+               end += size;
+               PARA_INFO_LOG("adding %zu bytes\n", end - start);
+               btr_add_output_dont_free(start, end - start, btrn);
+               ret = -E_RECV_EOF;
+               goto out;
+       }
+       if (pard->current_chunk == pard->first_chunk)
+               pard->stream_start = *now;
+       else {
+               compute_chunk_time(pard->current_chunk - pard->first_chunk,
+                       &afhi->chunk_tv, &pard->stream_start, &chunk_time);
+               ret = tv_diff(&chunk_time, now, NULL);
+               if (ret > 0)
+                       goto out;
+       }
+       afh_get_chunk(pard->current_chunk, afhi, pard->map, &start, &size);
+       PARA_DEBUG_LOG("adding chunk %u\n", pard->current_chunk);
+       btr_add_output_dont_free(start, size, btrn);
+       if (pard->current_chunk >= pard->last_chunk) {
+               ret = -E_RECV_EOF;
+               goto out;
+       }
+       pard->current_chunk++;
+       ret = 1;
+out:
+       if (ret < 0) {
+               btr_remove_node(&rn->btrn);
+               pard->current_chunk = pard->first_chunk;
+       }
+       t->error = ret;
+}
 
 /**
  * The init function of the afh receiver.
 
 /**
  * The init function of the afh receiver.
@@ -24,15 +247,15 @@ void afh_recv_init(struct receiver *r)
 {
        struct afh_recv_args_info dummy;
 
 {
        struct afh_recv_args_info dummy;
 
+       afh_init();
        afh_recv_cmdline_parser_init(&dummy);
        afh_recv_cmdline_parser_init(&dummy);
-#if 0
        r->open = afh_recv_open;
        r->close = afh_recv_close;
        r->pre_select = afh_recv_pre_select;
        r->post_select = afh_recv_post_select;
        r->parse_config = afh_recv_parse_config;
        r->free_config = afh_recv_free_config;
        r->open = afh_recv_open;
        r->close = afh_recv_close;
        r->pre_select = afh_recv_pre_select;
        r->post_select = afh_recv_post_select;
        r->parse_config = afh_recv_parse_config;
        r->free_config = afh_recv_free_config;
-#endif
+       r->execute = afh_execute;
        r->help = (struct ggo_help) {
                .short_help = afh_recv_args_info_help,
                .detailed_help = afh_recv_args_info_detailed_help
        r->help = (struct ggo_help) {
                .short_help = afh_recv_args_info_help,
                .detailed_help = afh_recv_args_info_detailed_help
diff --git a/error.h b/error.h
index 1e624667d7dfe51af441ca48df8e2b27f81a5500..43d9eef2524ccefefd486d4166c8d1a234f9d514 100644 (file)
--- a/error.h
+++ b/error.h
@@ -33,7 +33,6 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define FILE_WRITE_ERRORS
 #define STDIN_ERRORS
 #define WRITE_ERRORS
 #define FILE_WRITE_ERRORS
 #define STDIN_ERRORS
 #define WRITE_ERRORS
-#define AFH_RECV_ERRORS
 
 extern const char **para_errlist[];
 
 
 extern const char **para_errlist[];
 
@@ -69,6 +68,10 @@ extern const char **para_errlist[];
        PARA_ERROR(FLAC_STREAMINFO, "could not read stream info meta block"), \
 
 
        PARA_ERROR(FLAC_STREAMINFO, "could not read stream info meta block"), \
 
 
+#define AFH_RECV_ERRORS \
+       PARA_ERROR(AFH_RECV_BAD_FILENAME, "invalid file name"), \
+
+
 #define OGG_AFH_COMMON_ERRORS \
        PARA_ERROR(STREAM_PACKETOUT, "ogg stream packet-out error (first packet)"), \
        PARA_ERROR(SYNC_PAGEOUT, "ogg sync page-out error (no ogg file?)"), \
 #define OGG_AFH_COMMON_ERRORS \
        PARA_ERROR(STREAM_PACKETOUT, "ogg stream packet-out error (first packet)"), \
        PARA_ERROR(SYNC_PAGEOUT, "ogg sync page-out error (no ogg file?)"), \