X-Git-Url: http://git.tuebingen.mpg.de/?a=blobdiff_plain;f=vss.c;h=afde0dcb829b42422310eaf33361b990bb98c2ac;hb=28f8405e062fcff1f0ce90eb01ffeaca299cffa7;hp=898180c013a4845481bbfd48782fa9168f8e9048;hpb=3746051fdb11bf567bd9870080ba7a5f44dbadd8;p=paraslash.git diff --git a/vss.c b/vss.c index 898180c0..afde0dcb 100644 --- a/vss.c +++ b/vss.c @@ -26,8 +26,8 @@ #include "net.h" #include "server.cmdline.h" #include "list.h" -#include "vss.h" #include "send.h" +#include "vss.h" #include "ipc.h" #include "fd.h" #include "sched.h" @@ -137,6 +137,8 @@ struct fec_group { struct fec_client { /** If negative, this client is temporarily disabled. */ int error; + /** UDP or DCCP. */ + struct sender *sender; /** Parameters requested by the client. */ struct fec_client_parms *fcp; /** Used by the core FEC code. */ @@ -161,6 +163,8 @@ struct fec_client { int num_extra_slices; /** Contains the FEC-encoded data. */ unsigned char *enc_buf; + /** Pointer obtained from sender when the client is added. */ + void *private_data; }; /** @@ -413,33 +417,16 @@ size_t vss_get_fec_eof_packet(const char **buf) * * \return Standard. */ -int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result) +int vss_add_fec_client(struct sender *sender, void *private_data, + struct fec_client **result) { - int ret; - struct fec_client *fc; + struct fec_client *fc = para_calloc(sizeof(*fc)); - if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) - return -ERRNO_TO_PARA_ERROR(EINVAL); - fc = para_calloc(sizeof(*fc)); - fc->fcp = fcp; - ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, - &fc->parms); - if (ret < 0) - goto err; - fc->first_stream_chunk = -1; /* stream not yet started */ - fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); - fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->num_extra_slices = 0; - fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->next_header_time.tv_sec = 0; + fc->private_data = private_data; + fc->sender = sender; para_list_add(&fc->node, &fec_client_list); *result = fc; return 1; -err: - fec_free(fc->parms); - free(fc); - *result = NULL; - return ret; } /** @@ -713,6 +700,7 @@ static void vss_pre_select(struct sched *s, struct task *t) list_for_each_entry_safe(fc, tmp, &fec_client_list, node) { fc->first_stream_chunk = -1; fc->error = 0; + fc->fcp = NULL; } mmd->stream_start.tv_sec = 0; mmd->stream_start.tv_usec = 0; @@ -837,6 +825,35 @@ err: mmd->new_vss_status_flags = VSS_NEXT; } +static int open_fec_client(struct fec_client *fc) +{ + int ret; + struct fec_client_parms *fcp; + + ret = fc->sender->open(fc->private_data, &fc->fcp); + if (ret < 0) { + fc->fcp = NULL; + return ret; + } + fcp = fc->fcp; + if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) + return -ERRNO_TO_PARA_ERROR(EINVAL); + ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, + &fc->parms); + if (ret < 0) + goto err; + fc->first_stream_chunk = -1; /* stream not yet started */ + fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); + fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->num_extra_slices = 0; + fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->next_header_time.tv_sec = 0; + return 1; +err: + fec_free(fc->parms); + return ret; +} + /** * Main sending function. * @@ -848,7 +865,7 @@ err: */ static void vss_send(struct vss_task *vsst) { - int i, fec_active = 0; + int ret, i, fec_active = 0; struct timeval due; struct fec_client *fc, *tmp_fc; @@ -862,6 +879,13 @@ static void vss_send(struct vss_task *vsst) list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { if (fc->error < 0) continue; + if (!fc->fcp) { + ret = open_fec_client(fc); + if (ret < 0) { + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + continue; + } + } if (!next_slice_is_due(fc, NULL)) { fec_active = 1; continue; @@ -870,9 +894,9 @@ static void vss_send(struct vss_task *vsst) continue; PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num, fc->current_slice_num, fc->fcp->max_slice_bytes); - fc->fcp->send((char *)fc->enc_buf, + fc->sender->send_fec((char *)fc->enc_buf, fc->fcp->max_slice_bytes, - fc->fcp->private_data); + fc->private_data); fc->current_slice_num++; fec_active = 1; }