#include "net.h"
#include "server.cmdline.h"
#include "list.h"
-#include "vss.h"
#include "send.h"
+#include "vss.h"
#include "ipc.h"
#include "fd.h"
#include "sched.h"
uint8_t num_header_slices;
};
+enum fec_client_state {
+ FEC_STATE_NONE = 0, /**< not initialized and not enabled */
+ FEC_STATE_DISABLED, /**< temporarily disabled */
+ FEC_STATE_READY_TO_RUN /**< initialized and enabled */
+};
+
/**
* Describes one connected FEC client.
*/
struct fec_client {
- /** If negative, this client is temporarily disabled. */
- int error;
+ /** Current state of the client */
+ enum fec_client_state state;
+ /** The connected sender client (transport layer). */
+ struct sender_client *sc;
/** Parameters requested by the client. */
struct fec_client_parms *fcp;
/** Used by the core FEC code. */
static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
{
- assert(fc->error >= 0);
if (fc->first_stream_chunk < 0 || fc->current_slice_num
== fc->fcp->slices_per_group + fc->num_extra_slices) {
int ret = setup_next_fec_group(fc, vsst);
if (ret < 0) {
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
PARA_ERROR_LOG("FEC client temporarily disabled\n");
- fc->error = ret;
- return fc->error;
+ fc->state = FEC_STATE_DISABLED;
+ return ret;
}
}
write_fec_header(fc, vsst);
/**
* Add one entry to the list of active fec clients.
*
- * \param fcp Describes the fec parameters to be used for this client.
- * \param result An opaque pointer that must be used by remove the client later.
+ * \param sc Generic sender_client data of the transport layer.
+ * \param fcp FEC parameters as supplied by the transport layer.
*
- * \return Standard.
+ * \return Newly allocated fec_client struct.
*/
-int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result)
+struct fec_client *vss_add_fec_client(struct sender_client *sc,
+ struct fec_client_parms *fcp)
{
- int ret;
- struct fec_client *fc;
+ struct fec_client *fc = para_calloc(sizeof(*fc));
- if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
- return -ERRNO_TO_PARA_ERROR(EINVAL);
- fc = para_calloc(sizeof(*fc));
+ fc->sc = sc;
fc->fcp = fcp;
- ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
- &fc->parms);
- if (ret < 0)
- goto err;
- fc->first_stream_chunk = -1; /* stream not yet started */
- fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
- fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
- fc->num_extra_slices = 0;
- fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
- fc->next_header_time.tv_sec = 0;
para_list_add(&fc->node, &fec_client_list);
- *result = fc;
- return 1;
-err:
- fec_free(fc->parms);
- free(fc);
- *result = NULL;
- return ret;
+ return fc;
}
/**
{
struct fec_client *fc;
- assert(vss_playing());
list_for_each_entry(fc, &fec_client_list, node) {
struct timeval diff;
- if (fc->error < 0)
+ if (fc->state != FEC_STATE_READY_TO_RUN)
continue;
if (next_slice_is_due(fc, &diff)) {
timeout->tv_sec = 0;
list_for_each_entry(fc, &fec_client_list, node) {
struct timeval group_duration;
- if (fc->error < 0)
+ if (fc->state != FEC_STATE_READY_TO_RUN)
continue;
tv_scale(fc->group.num_chunks, chunk_tv, &group_duration);
if (tv_diff(&timeout, &group_duration, NULL) < 0)
senders[i].shutdown_clients();
list_for_each_entry_safe(fc, tmp, &fec_client_list, node) {
fc->first_stream_chunk = -1;
- fc->error = 0;
+ fc->state = FEC_STATE_NONE;
}
mmd->stream_start.tv_sec = 0;
mmd->stream_start.tv_usec = 0;
mmd->new_vss_status_flags = VSS_NEXT;
}
+static int initialize_fec_client(struct fec_client *fc)
+{
+ int ret;
+ struct fec_client_parms *fcp = fc->fcp;
+
+ if (fcp->init_fec) {
+ /*
+ * Set the maximum slice size to the Maximum Packet Size if the
+ * transport protocol allows to determine this value. The user
+ * can specify a slice size up to this value.
+ */
+ ret = fcp->init_fec(fc->sc);
+ if (ret < 0)
+ return ret;
+ if (!fcp->max_slice_bytes || fcp->max_slice_bytes > ret)
+ fcp->max_slice_bytes = ret;
+ }
+ if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
+ return -ERRNO_TO_PARA_ERROR(EINVAL);
+ ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
+ &fc->parms);
+ if (ret < 0)
+ goto err;
+ fc->first_stream_chunk = -1; /* stream not yet started */
+ fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
+ fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
+ fc->num_extra_slices = 0;
+ fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
+ fc->next_header_time.tv_sec = 0;
+ fc->state = FEC_STATE_READY_TO_RUN;
+ return 1;
+err:
+ fec_free(fc->parms);
+ return ret;
+}
+
/**
* Main sending function.
*
*/
static void vss_send(struct vss_task *vsst)
{
- int i, fec_active = 0;
+ int ret, i, fec_active = 0;
struct timeval due;
struct fec_client *fc, *tmp_fc;
&due, 1) < 0)
return;
list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
- if (fc->error < 0)
+ switch (fc->state) {
+ case FEC_STATE_DISABLED:
continue;
+ case FEC_STATE_NONE:
+ ret = initialize_fec_client(fc);
+ if (ret < 0) {
+ PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+ continue;
+ }
+ /* fall through */
+ case FEC_STATE_READY_TO_RUN:
+ break;
+ }
if (!next_slice_is_due(fc, NULL)) {
fec_active = 1;
continue;
continue;
PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num,
fc->current_slice_num, fc->fcp->max_slice_bytes);
- fc->fcp->send((char *)fc->enc_buf,
- fc->fcp->max_slice_bytes,
- fc->fcp->private_data);
+ fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf,
+ fc->fcp->max_slice_bytes);
fc->current_slice_num++;
fec_active = 1;
}