Remove some unused error codes.
[paraslash.git] / vss.c
diff --git a/vss.c b/vss.c
index bfb0f0a175dc5da0e79bad5ab0497d6f1faff2f2..adc0cb63cea1204d3bfef811c492b807b74001c2 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -26,8 +26,8 @@
 #include "net.h"
 #include "server.cmdline.h"
 #include "list.h"
-#include "vss.h"
 #include "send.h"
+#include "vss.h"
 #include "ipc.h"
 #include "fd.h"
 #include "sched.h"
@@ -131,12 +131,20 @@ struct fec_group {
        uint8_t num_header_slices;
 };
 
+enum fec_client_state {
+       FEC_STATE_NONE = 0,     /**< not initialized and not enabled */
+       FEC_STATE_DISABLED,     /**< temporarily disabled */
+       FEC_STATE_READY_TO_RUN  /**< initialized and enabled */
+};
+
 /**
  * Describes one connected FEC client.
  */
 struct fec_client {
-       /** If negative, this client is temporarily disabled. */
-       int error;
+       /** Current state of the client */
+       enum fec_client_state state;
+       /** The connected sender client (transport layer). */
+       struct sender_client *sc;
        /** Parameters requested by the client. */
        struct fec_client_parms *fcp;
        /** Used by the core FEC code. */
@@ -369,7 +377,6 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
 
 static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
 {
-       assert(fc->error >= 0);
        if (fc->first_stream_chunk < 0 || fc->current_slice_num
                        == fc->fcp->slices_per_group + fc->num_extra_slices) {
                int ret = setup_next_fec_group(fc, vsst);
@@ -378,8 +385,8 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
                if (ret < 0) {
                        PARA_ERROR_LOG("%s\n", para_strerror(-ret));
                        PARA_ERROR_LOG("FEC client temporarily disabled\n");
-                       fc->error = ret;
-                       return fc->error;
+                       fc->state = FEC_STATE_DISABLED;
+                       return ret;
                }
        }
        write_fec_header(fc, vsst);
@@ -408,38 +415,20 @@ size_t vss_get_fec_eof_packet(const char **buf)
 /**
  * Add one entry to the list of active fec clients.
  *
- * \param fcp Describes the fec parameters to be used for this client.
- * \param result An opaque pointer that must be used by remove the client later.
+ * \param sc  Generic sender_client data of the transport layer.
+ * \param fcp FEC parameters as supplied by the transport layer.
  *
- * \return Standard.
+ * \return Newly allocated fec_client struct.
  */
-int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result)
+struct fec_client *vss_add_fec_client(struct sender_client *sc,
+                                     struct fec_client_parms *fcp)
 {
-       int ret;
-       struct fec_client *fc;
+       struct fec_client *fc = para_calloc(sizeof(*fc));
 
-       if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
-               return -ERRNO_TO_PARA_ERROR(EINVAL);
-       fc = para_calloc(sizeof(*fc));
+       fc->sc  = sc;
        fc->fcp = fcp;
-       ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
-               &fc->parms);
-       if (ret < 0)
-               goto err;
-       fc->first_stream_chunk = -1; /* stream not yet started */
-       fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
-       fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
-       fc->num_extra_slices = 0;
-       fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
-       fc->next_header_time.tv_sec = 0;
        para_list_add(&fc->node, &fec_client_list);
-       *result = fc;
-       return 1;
-err:
-       fec_free(fc->parms);
-       free(fc);
-       *result = NULL;
-       return ret;
+       return fc;
 }
 
 /**
@@ -481,11 +470,10 @@ static void compute_slice_timeout(struct timeval *timeout)
 {
        struct fec_client *fc;
 
-       assert(vss_playing());
        list_for_each_entry(fc, &fec_client_list, node) {
                struct timeval diff;
 
-               if (fc->error < 0)
+               if (fc->state != FEC_STATE_READY_TO_RUN)
                        continue;
                if (next_slice_is_due(fc, &diff)) {
                        timeout->tv_sec = 0;
@@ -508,7 +496,7 @@ static void set_eof_barrier(struct vss_task *vsst)
        list_for_each_entry(fc, &fec_client_list, node) {
                struct timeval group_duration;
 
-               if (fc->error < 0)
+               if (fc->state != FEC_STATE_READY_TO_RUN)
                        continue;
                tv_scale(fc->group.num_chunks, chunk_tv, &group_duration);
                if (tv_diff(&timeout, &group_duration, NULL) < 0)
@@ -712,7 +700,7 @@ static void vss_pre_select(struct sched *s, struct task *t)
                                senders[i].shutdown_clients();
                list_for_each_entry_safe(fc, tmp, &fec_client_list, node) {
                        fc->first_stream_chunk = -1;
-                       fc->error = 0;
+                       fc->state = FEC_STATE_NONE;
                }
                mmd->stream_start.tv_sec = 0;
                mmd->stream_start.tv_usec = 0;
@@ -781,16 +769,20 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
        return 1;
 }
 
-static void recv_afs_result(struct vss_task *vsst)
+static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
 {
        int ret, passed_fd, shmid;
        uint32_t afs_code = 0, afs_data = 0;
        struct stat statbuf;
 
-       vsst->afsss = AFS_SOCKET_READY;
+       if (!FD_ISSET(vsst->afs_socket, rfds))
+               return;
        ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data);
+       if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN))
+               return;
        if (ret < 0)
                goto err;
+       vsst->afsss = AFS_SOCKET_READY;
        PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code,
                afs_data);
        ret = -E_NOFD;
@@ -833,6 +825,42 @@ err:
        mmd->new_vss_status_flags = VSS_NEXT;
 }
 
+static int initialize_fec_client(struct fec_client *fc)
+{
+       int ret;
+       struct fec_client_parms *fcp = fc->fcp;
+
+       if (fcp->init_fec) {
+               /*
+                * Set the maximum slice size to the Maximum Packet Size if the
+                * transport protocol allows to determine this value. The user
+                * can specify a slice size up to this value.
+                */
+               ret = fcp->init_fec(fc->sc);
+               if (ret < 0)
+                       return ret;
+               if (!fcp->max_slice_bytes || fcp->max_slice_bytes > ret)
+                       fcp->max_slice_bytes = ret;
+       }
+       if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
+               return -ERRNO_TO_PARA_ERROR(EINVAL);
+       ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
+               &fc->parms);
+       if (ret < 0)
+               goto err;
+       fc->first_stream_chunk = -1; /* stream not yet started */
+       fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
+       fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
+       fc->num_extra_slices = 0;
+       fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
+       fc->next_header_time.tv_sec = 0;
+       fc->state = FEC_STATE_READY_TO_RUN;
+       return 1;
+err:
+       fec_free(fc->parms);
+       return ret;
+}
+
 /**
  * Main sending function.
  *
@@ -844,7 +872,7 @@ err:
  */
 static void vss_send(struct vss_task *vsst)
 {
-       int i, fec_active = 0;
+       int ret, i, fec_active = 0;
        struct timeval due;
        struct fec_client *fc, *tmp_fc;
 
@@ -856,8 +884,19 @@ static void vss_send(struct vss_task *vsst)
                        &due, 1) < 0)
                return;
        list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
-               if (fc->error < 0)
+               switch (fc->state) {
+               case FEC_STATE_DISABLED:
                        continue;
+               case FEC_STATE_NONE:
+                       ret = initialize_fec_client(fc);
+                       if (ret < 0) {
+                               PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+                               continue;
+                       }
+                       /* fall through */
+               case FEC_STATE_READY_TO_RUN:
+                       break;
+               }
                if (!next_slice_is_due(fc, NULL)) {
                        fec_active = 1;
                        continue;
@@ -866,9 +905,8 @@ static void vss_send(struct vss_task *vsst)
                        continue;
                PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num,
                        fc->current_slice_num, fc->fcp->max_slice_bytes);
-               fc->fcp->send((char *)fc->enc_buf,
-                       fc->fcp->max_slice_bytes,
-                       fc->fcp->private_data);
+               fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf,
+                                 fc->fcp->max_slice_bytes);
                fc->current_slice_num++;
                fec_active = 1;
        }
@@ -916,14 +954,17 @@ static void vss_post_select(struct sched *s, struct task *t)
                int num = mmd->sender_cmd_data.cmd_num,
                        sender_num = mmd->sender_cmd_data.sender_num;
 
-               if (senders[sender_num].client_cmds[num])
-                       senders[sender_num].client_cmds[num](&mmd->sender_cmd_data);
+               if (senders[sender_num].client_cmds[num]) {
+                       ret = senders[sender_num].client_cmds[num]
+                               (&mmd->sender_cmd_data);
+                       if (ret < 0)
+                               PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+               }
                mmd->sender_cmd_data.cmd_num = -1;
        }
-       if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) {
-               if (FD_ISSET(vsst->afs_socket, &s->rfds))
-                       recv_afs_result(vsst);
-       } else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
+       if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)
+               recv_afs_result(vsst, &s->rfds);
+       else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
                PARA_NOTICE_LOG("requesting new fd from afs\n");
                ret = send_buffer(vsst->afs_socket, "new");
                if (ret < 0)