]> git.tuebingen.mpg.de Git - paraslash.git/blobdiff - vss.c
paraslash 0.7.3
[paraslash.git] / vss.c
diff --git a/vss.c b/vss.c
index 9e2e32ca3b3bd7979b3c564ff1959dcbe7560446..cd55851c366cec61c0bf7bc9501609fc2fa8a6c5 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -28,8 +28,8 @@
 #include "net.h"
 #include "server.h"
 #include "list.h"
-#include "send.h"
 #include "sched.h"
+#include "send.h"
 #include "vss.h"
 #include "ipc.h"
 #include "fd.h"
@@ -43,7 +43,7 @@ const struct sender * const senders[] = {
 enum afs_socket_status {
        /** Socket is inactive. */
        AFS_SOCKET_READY,
-       /** Socket fd was included in the write fd set for select(). */
+       /** Socket fd was monitored for writing. */
        AFS_SOCKET_CHECK_FOR_WRITE,
        /** vss wrote a request to the socket and waits for reply from afs. */
        AFS_SOCKET_AFD_PENDING
@@ -335,10 +335,10 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
        ret = fec_new(k, n, &fc->parms);
        if (ret < 0)
                return ret;
-       fc->src_data = para_malloc(k * sizeof(char *));
+       fc->src_data = arr_alloc(k, sizeof(char *));
        for (i = 0; i < k; i++)
-               fc->src_data[i] = para_malloc(fc->mps);
-       fc->enc_buf = para_malloc(fc->mps);
+               fc->src_data[i] = alloc(fc->mps);
+       fc->enc_buf = alloc(fc->mps);
 
        fc->state = FEC_STATE_READY_TO_RUN;
        fc->next_header_time.tv_sec = 0;
@@ -348,7 +348,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
 }
 
 static int vss_get_chunk(int chunk_num, struct vss_task *vsst,
-               char **buf, size_t *sz)
+               char **buf, uint32_t *len)
 {
        int ret;
 
@@ -363,15 +363,15 @@ static int vss_get_chunk(int chunk_num, struct vss_task *vsst,
        if (chunk_num == 0 && vsst->header_len > 0) {
                assert(vsst->header_buf);
                *buf = vsst->header_buf; /* stripped header */
-               *sz = vsst->header_len;
+               *len = vsst->header_len;
                return 0;
        }
        ret = afh_get_chunk(chunk_num, &mmd->afd.afhi,
                mmd->afd.audio_format_id, vsst->map, vsst->mapsize,
-               (const char **)buf, sz, &vsst->afh_context);
+               (const char **)buf, len, &vsst->afh_context);
        if (ret < 0) {
                *buf = NULL;
-               *sz = 0;
+               *len = 0;
        }
        return ret;
 }
@@ -380,7 +380,7 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g,
                int max_bytes)
 {
        char *buf;
-       size_t len;
+       uint32_t len;
        int ret, i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time()));
 
        if (g->first_chunk == 0) {
@@ -587,7 +587,7 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
        slice_copied = 0;
        for (c = g->first_chunk; c < g->first_chunk + g->num_chunks; c++) {
                char *buf;
-               size_t src_len;
+               uint32_t src_len;
                ret = vss_get_chunk(c, vsst, &buf, &src_len);
                if (ret < 0)
                        return ret;
@@ -671,7 +671,7 @@ size_t vss_get_fec_eof_packet(const char **buf)
 struct fec_client *vss_add_fec_client(struct sender_client *sc,
                                      struct fec_client_parms *fcp)
 {
-       struct fec_client *fc = para_calloc(sizeof(*fc));
+       struct fec_client *fc = zalloc(sizeof(*fc));
 
        fc->sc  = sc;
        fc->fcp = fcp;
@@ -827,7 +827,7 @@ static void vss_compute_timeout(struct sched *s, struct vss_task *vsst)
        if (sched_request_barrier(&vsst->data_send_barrier, s) == 1)
                return;
        /*
-        * Compute the select timeout as the minimal time until the next
+        * Compute the I/O timeout as the minimal time until the next
         * chunk/slice is due for any client.
         */
        compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
@@ -845,7 +845,6 @@ static void vss_compute_timeout(struct sched *s, struct vss_task *vsst)
 
 static void vss_eof(struct vss_task *vsst)
 {
-
        if (!vsst->map)
                return;
        if (mmd->new_vss_status_flags & VSS_NOMORE)
@@ -856,8 +855,8 @@ static void vss_eof(struct vss_task *vsst)
        para_munmap(vsst->map, vsst->mapsize);
        vsst->map = NULL;
        mmd->chunks_sent = 0;
-       //mmd->offset = 0;
        mmd->afd.afhi.seconds_total = 0;
+       mmd->afd.afhi.chunks_total = 0;
        mmd->afd.afhi.chunk_tv.tv_sec = 0;
        mmd->afd.afhi.chunk_tv.tv_usec = 0;
        free(mmd->afd.afhi.chunk_table);
@@ -893,21 +892,21 @@ static void set_mmd_offset(void)
        mmd->offset = tv2ms(&offset);
 }
 
-static void vss_pre_select(struct sched *s, void *context)
+static void vss_pre_monitor(struct sched *s, void *context)
 {
        int i;
        struct vss_task *vsst = context;
 
        if (need_to_request_new_audio_file(vsst)) {
                PARA_DEBUG_LOG("ready and playing, but no audio file\n");
-               para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno);
+               sched_monitor_writefd(vsst->afs_socket, s);
                vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE;
        } else
-               para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(vsst->afs_socket, s);
        FOR_EACH_SENDER(i) {
-               if (!senders[i]->pre_select)
+               if (!senders[i]->pre_monitor)
                        continue;
-               senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds);
+               senders[i]->pre_monitor(s);
        }
        vss_compute_timeout(s, vsst);
 }
@@ -951,13 +950,13 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
 #define MAP_POPULATE 0
 #endif
 
-static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
+static void recv_afs_result(struct vss_task *vsst, const struct sched *s)
 {
        int ret, passed_fd, shmid;
        uint32_t afs_code = 0, afs_data = 0;
        struct stat statbuf;
 
-       if (!FD_ISSET(vsst->afs_socket, rfds))
+       if (!sched_read_ok(vsst->afs_socket, s))
                return;
        ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data);
        if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN))
@@ -1015,13 +1014,8 @@ err:
 }
 
 /**
- * Main sending function.
- *
- * This function gets called from vss_post_select(). It checks whether the next
- * chunk of data should be pushed out. It obtains a pointer to the data to be
- * sent out as well as its length from mmd->afd.afhi. This information is then
- * passed to each supported sender's send() function as well as to the send()
- * functions of each registered fec client.
+ * If the next chunk needs to be sent, pass a pointer to the chunk data to all
+ * registered fec clients and to each sender's ->send() method.
  */
 static void vss_send(struct vss_task *vsst)
 {
@@ -1030,7 +1024,7 @@ static void vss_send(struct vss_task *vsst)
        struct timeval due;
        struct fec_client *fc, *tmp_fc;
        char *buf;
-       size_t len;
+       uint32_t len;
 
        if (!vsst->map || !vss_playing())
                return;
@@ -1088,7 +1082,7 @@ static void vss_send(struct vss_task *vsst)
        mmd->current_chunk++;
 }
 
-static int vss_post_select(struct sched *s, void *context)
+static int vss_post_monitor(struct sched *s, void *context)
 {
        int ret, i;
        struct vss_task *vsst = context;
@@ -1138,9 +1132,9 @@ static int vss_post_select(struct sched *s, void *context)
                mmd->sender_cmd_data.cmd_num = -1;
        }
        if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)
-               recv_afs_result(vsst, &s->rfds);
-       else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
-               PARA_NOTICE_LOG("requesting new fd from afs\n");
+               recv_afs_result(vsst, s);
+       else if (sched_write_ok(vsst->afs_socket, s)) {
+               PARA_INFO_LOG("requesting new fd from afs\n");
                ret = write_buffer(vsst->afs_socket, "new");
                if (ret < 0)
                        PARA_CRIT_LOG("%s\n", para_strerror(-ret));
@@ -1148,9 +1142,9 @@ static int vss_post_select(struct sched *s, void *context)
                        vsst->afsss = AFS_SOCKET_AFD_PENDING;
        }
        FOR_EACH_SENDER(i) {
-               if (!senders[i]->post_select)
+               if (!senders[i]->post_monitor)
                        continue;
-               senders[i]->post_select(&s->rfds, &s->wfds);
+               senders[i]->post_monitor(s);
        }
        if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) ||
                        (vss_next() && vss_playing()))
@@ -1195,8 +1189,8 @@ void vss_init(int afs_socket, struct sched *s)
        }
        vsst->task = task_register(&(struct task_info) {
                .name = "vss",
-               .pre_select = vss_pre_select,
-               .post_select = vss_post_select,
+               .pre_monitor = vss_pre_monitor,
+               .post_monitor = vss_post_monitor,
                .context = vsst,
        }, s);
 }
@@ -1209,11 +1203,14 @@ void vss_init(int afs_socket, struct sched *s)
 void vss_shutdown(void)
 {
        int i;
+       bool is_command_handler = process_is_command_handler();
 
        FOR_EACH_SENDER(i) {
                if (!senders[i]->shutdown)
                        continue;
-               PARA_NOTICE_LOG("shutting down %s sender\n", senders[i]->name);
+               if (!is_command_handler)
+                       PARA_NOTICE_LOG("shutting down %s sender\n",
+                               senders[i]->name);
                senders[i]->shutdown();
        }
 }