Merge branch 't/dccp_fec'
authorAndre Noll <maan@systemlinux.org>
Wed, 23 Jun 2010 18:55:08 +0000 (20:55 +0200)
committerAndre Noll <maan@systemlinux.org>
Wed, 23 Jun 2010 18:55:08 +0000 (20:55 +0200)
1  2 
vss.c

diff --combined vss.c
index 0eb235c72e711391beee1f417ca997e8c5d65870,e725eebdc94cdbe21a7d4ab3158c4ab89b311235..adc0cb63cea1204d3bfef811c492b807b74001c2
--- 1/vss.c
--- 2/vss.c
+++ b/vss.c
@@@ -26,8 -26,8 +26,8 @@@
  #include "net.h"
  #include "server.cmdline.h"
  #include "list.h"
- #include "vss.h"
  #include "send.h"
+ #include "vss.h"
  #include "ipc.h"
  #include "fd.h"
  #include "sched.h"
@@@ -131,12 -131,20 +131,20 @@@ struct fec_group 
        uint8_t num_header_slices;
  };
  
+ enum fec_client_state {
+       FEC_STATE_NONE = 0,     /**< not initialized and not enabled */
+       FEC_STATE_DISABLED,     /**< temporarily disabled */
+       FEC_STATE_READY_TO_RUN  /**< initialized and enabled */
+ };
  /**
   * Describes one connected FEC client.
   */
  struct fec_client {
-       /** If negative, this client is temporarily disabled. */
-       int error;
+       /** Current state of the client */
+       enum fec_client_state state;
+       /** The connected sender client (transport layer). */
+       struct sender_client *sc;
        /** Parameters requested by the client. */
        struct fec_client_parms *fcp;
        /** Used by the core FEC code. */
@@@ -369,7 -377,6 +377,6 @@@ static int setup_next_fec_group(struct 
  
  static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
  {
-       assert(fc->error >= 0);
        if (fc->first_stream_chunk < 0 || fc->current_slice_num
                        == fc->fcp->slices_per_group + fc->num_extra_slices) {
                int ret = setup_next_fec_group(fc, vsst);
                if (ret < 0) {
                        PARA_ERROR_LOG("%s\n", para_strerror(-ret));
                        PARA_ERROR_LOG("FEC client temporarily disabled\n");
-                       fc->error = ret;
-                       return fc->error;
+                       fc->state = FEC_STATE_DISABLED;
+                       return ret;
                }
        }
        write_fec_header(fc, vsst);
@@@ -408,38 -415,20 +415,20 @@@ size_t vss_get_fec_eof_packet(const cha
  /**
   * Add one entry to the list of active fec clients.
   *
-  * \param fcp Describes the fec parameters to be used for this client.
-  * \param result An opaque pointer that must be used by remove the client later.
+  * \param sc  Generic sender_client data of the transport layer.
+  * \param fcp FEC parameters as supplied by the transport layer.
   *
-  * \return Standard.
+  * \return Newly allocated fec_client struct.
   */
- int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result)
+ struct fec_client *vss_add_fec_client(struct sender_client *sc,
+                                     struct fec_client_parms *fcp)
  {
-       int ret;
-       struct fec_client *fc;
+       struct fec_client *fc = para_calloc(sizeof(*fc));
  
-       if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
-               return -ERRNO_TO_PARA_ERROR(EINVAL);
-       fc = para_calloc(sizeof(*fc));
+       fc->sc  = sc;
        fc->fcp = fcp;
-       ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
-               &fc->parms);
-       if (ret < 0)
-               goto err;
-       fc->first_stream_chunk = -1; /* stream not yet started */
-       fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
-       fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
-       fc->num_extra_slices = 0;
-       fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
-       fc->next_header_time.tv_sec = 0;
        para_list_add(&fc->node, &fec_client_list);
-       *result = fc;
-       return 1;
- err:
-       fec_free(fc->parms);
-       free(fc);
-       *result = NULL;
-       return ret;
+       return fc;
  }
  
  /**
@@@ -481,11 -470,10 +470,10 @@@ static void compute_slice_timeout(struc
  {
        struct fec_client *fc;
  
-       assert(vss_playing());
        list_for_each_entry(fc, &fec_client_list, node) {
                struct timeval diff;
  
-               if (fc->error < 0)
+               if (fc->state != FEC_STATE_READY_TO_RUN)
                        continue;
                if (next_slice_is_due(fc, &diff)) {
                        timeout->tv_sec = 0;
@@@ -508,7 -496,7 +496,7 @@@ static void set_eof_barrier(struct vss_
        list_for_each_entry(fc, &fec_client_list, node) {
                struct timeval group_duration;
  
-               if (fc->error < 0)
+               if (fc->state != FEC_STATE_READY_TO_RUN)
                        continue;
                tv_scale(fc->group.num_chunks, chunk_tv, &group_duration);
                if (tv_diff(&timeout, &group_duration, NULL) < 0)
@@@ -712,7 -700,7 +700,7 @@@ static void vss_pre_select(struct sche
                                senders[i].shutdown_clients();
                list_for_each_entry_safe(fc, tmp, &fec_client_list, node) {
                        fc->first_stream_chunk = -1;
-                       fc->error = 0;
+                       fc->state = FEC_STATE_NONE;
                }
                mmd->stream_start.tv_sec = 0;
                mmd->stream_start.tv_usec = 0;
@@@ -837,6 -825,42 +825,42 @@@ err
        mmd->new_vss_status_flags = VSS_NEXT;
  }
  
+ static int initialize_fec_client(struct fec_client *fc)
+ {
+       int ret;
+       struct fec_client_parms *fcp = fc->fcp;
+       if (fcp->init_fec) {
+               /*
+                * Set the maximum slice size to the Maximum Packet Size if the
+                * transport protocol allows to determine this value. The user
+                * can specify a slice size up to this value.
+                */
+               ret = fcp->init_fec(fc->sc);
+               if (ret < 0)
+                       return ret;
+               if (!fcp->max_slice_bytes || fcp->max_slice_bytes > ret)
+                       fcp->max_slice_bytes = ret;
+       }
+       if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
+               return -ERRNO_TO_PARA_ERROR(EINVAL);
+       ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
+               &fc->parms);
+       if (ret < 0)
+               goto err;
+       fc->first_stream_chunk = -1; /* stream not yet started */
+       fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
+       fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
+       fc->num_extra_slices = 0;
+       fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
+       fc->next_header_time.tv_sec = 0;
+       fc->state = FEC_STATE_READY_TO_RUN;
+       return 1;
+ err:
+       fec_free(fc->parms);
+       return ret;
+ }
  /**
   * Main sending function.
   *
   */
  static void vss_send(struct vss_task *vsst)
  {
-       int i, fec_active = 0;
+       int ret, i, fec_active = 0;
        struct timeval due;
        struct fec_client *fc, *tmp_fc;
  
                        &due, 1) < 0)
                return;
        list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
-               if (fc->error < 0)
+               switch (fc->state) {
+               case FEC_STATE_DISABLED:
                        continue;
+               case FEC_STATE_NONE:
+                       ret = initialize_fec_client(fc);
+                       if (ret < 0) {
+                               PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+                               continue;
+                       }
+                       /* fall through */
+               case FEC_STATE_READY_TO_RUN:
+                       break;
+               }
                if (!next_slice_is_due(fc, NULL)) {
                        fec_active = 1;
                        continue;
                        continue;
                PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num,
                        fc->current_slice_num, fc->fcp->max_slice_bytes);
-               fc->fcp->send((char *)fc->enc_buf,
-                       fc->fcp->max_slice_bytes,
-                       fc->fcp->private_data);
+               fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf,
+                                 fc->fcp->max_slice_bytes);
                fc->current_slice_num++;
                fec_active = 1;
        }
@@@ -920,12 -954,8 +954,12 @@@ static void vss_post_select(struct sche
                int num = mmd->sender_cmd_data.cmd_num,
                        sender_num = mmd->sender_cmd_data.sender_num;
  
 -              if (senders[sender_num].client_cmds[num])
 -                      senders[sender_num].client_cmds[num](&mmd->sender_cmd_data);
 +              if (senders[sender_num].client_cmds[num]) {
 +                      ret = senders[sender_num].client_cmds[num]
 +                              (&mmd->sender_cmd_data);
 +                      if (ret < 0)
 +                              PARA_ERROR_LOG("%s\n", para_strerror(-ret));
 +              }
                mmd->sender_cmd_data.cmd_num = -1;
        }
        if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)