int num_extra_slices;
/** Contains the FEC-encoded data. */
unsigned char *enc_buf;
+ /** Maximal packet size. */
+ int mps;
};
/**
write_u32(buf + 14, g->bytes);
write_u8(buf + 18, fc->current_slice_num);
- write_u16(buf + 20, p->max_slice_bytes - FEC_HEADER_SIZE);
+ write_u16(buf + 20, fc->mps - FEC_HEADER_SIZE);
write_u8(buf + 22, g->first_chunk? 0 : 1);
write_u8(buf + 23, vsst->header_len? 1 : 0);
memset(buf + 24, 0, 7);
return 1;
}
-static int num_slices(long unsigned bytes, struct fec_client *fc, uint8_t *result)
+static int num_slices(long unsigned bytes, int mps, int rs)
{
- unsigned long m = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE;
- unsigned rv, redundant_slices = fc->fcp->slices_per_group
- - fc->fcp->data_slices_per_group;
+ int m = mps - FEC_HEADER_SIZE;
+ int ret;
- if (!m)
- return -E_BAD_CT;
- rv = (bytes + m - 1) / m;
- if (rv + redundant_slices > 255)
+ assert(m > 0);
+ assert(rs > 0);
+ ret = (bytes + m - 1) / m;
+ if (ret + rs > 255)
return -E_BAD_CT;
- *result = rv;
- return 1;
+ return ret;
}
/* set group start and group duration */
tv2ms(&g->duration), tv2ms(chunk_tv), tv2ms(&g->slice_duration));
}
+static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
+{
+ int k, n, ret, mps;
+ int hs, ds, rs; /* header/data/redundant slices */
+ struct fec_client_parms *fcp = fc->fcp;
+
+ /* set mps */
+ if (fcp->init_fec) {
+ /*
+ * Set the maximum slice size to the Maximum Packet Size if the
+ * transport protocol allows to determine this value. The user
+ * can specify a slice size up to this value.
+ */
+ ret = fcp->init_fec(fc->sc);
+ if (ret < 0)
+ return ret;
+ mps = ret;
+ } else
+ mps = generic_max_transport_msg_size(fc->sc->fd);
+ if (mps <= FEC_HEADER_SIZE)
+ return -ERRNO_TO_PARA_ERROR(EINVAL);
+
+ rs = fc->fcp->slices_per_group - fc->fcp->data_slices_per_group;
+ ret = num_slices(vsst->header_len, mps, rs);
+ if (ret < 0)
+ goto err;
+ hs = ret;
+ ret = num_slices(afh_get_largest_chunk_size(&mmd->afd.afhi),
+ mps, rs);
+ if (ret < 0)
+ goto err;
+ ds = ret;
+ k = ret + ds;
+ if (k < fc->fcp->data_slices_per_group)
+ k = fc->fcp->data_slices_per_group;
+ n = k + rs;
+ PARA_CRIT_LOG("hs: %d, ds: %d, rs: %d, k: %d, n: %d\n", hs, ds, rs, k, n);
+ fec_free(fc->parms);
+ ret = fec_new(k, n, &fc->parms);
+ if (ret < 0)
+ return ret;
+ fc->num_extra_slices = k - fc->fcp->data_slices_per_group;
+ PARA_NOTICE_LOG("fec parms %d:%d:%d (%d extra slices)\n",
+ mps, k, n, fc->num_extra_slices);
+ fc->src_data = para_realloc(fc->src_data, k * sizeof(char *));
+ fc->enc_buf = para_realloc(fc->enc_buf, mps);
+ memset(fc->enc_buf, 0, mps);
+ fc->extra_src_buf = para_realloc(fc->extra_src_buf, mps);
+ memset(fc->extra_src_buf, 0, mps);
+
+ fc->mps = mps;
+ fc->state = FEC_STATE_READY_TO_RUN;
+ fc->next_header_time.tv_sec = 0;
+ fc->stream_start = *now;
+ fc->first_stream_chunk = mmd->current_chunk;
+ return 1;
+err:
+ fec_free(fc->parms);
+ return ret;
+}
+
static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
{
- int ret, i, k, data_slices;
+ int ret, i, k, n, data_slices;
size_t len;
const char *buf, *start_buf;
struct fec_group *g = &fc->group;
- unsigned slice_bytes = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE;
+ unsigned slice_bytes;
uint32_t max_data_size;
- if (fc->first_stream_chunk < 0) {
- uint8_t hs, ds; /* needed header/data slices */
- uint8_t rs = fc->fcp->slices_per_group
- - fc->fcp->data_slices_per_group; /* redundant slices */
- int n;
-
- ret = num_slices(vsst->header_len, fc, &hs);
- if (ret < 0)
- return ret;
- ret = num_slices(afh_get_largest_chunk_size(&mmd->afd.afhi),
- fc, &ds);
- if (ret < 0)
- return ret;
- k = (int)hs + ds;
- if (k > 255)
- return -E_BAD_CT;
- if (k < fc->fcp->data_slices_per_group)
- k = fc->fcp->data_slices_per_group;
- n = k + rs;
- fc->num_extra_slices = k - fc->fcp->data_slices_per_group;
- PARA_NOTICE_LOG("fec parms %d:%d:%d (%d extra slices)\n",
- slice_bytes, k, n, fc->num_extra_slices);
- fec_free(fc->parms);
- fc->src_data = para_realloc(fc->src_data, k * sizeof(char *));
- ret = fec_new(k, n, &fc->parms);
+ if (fc->state == FEC_STATE_NONE) {
+ ret = initialize_fec_client(fc, vsst);
if (ret < 0)
return ret;
- fc->stream_start = *now;
- fc->first_stream_chunk = mmd->current_chunk;
g->first_chunk = mmd->current_chunk;
g->num = 0;
g->start = *now;
+
} else {
struct timeval tmp;
if (g->first_chunk + g->num_chunks >= mmd->afd.afhi.chunks_total)
*/
tmp = g->start;
tv_add(&tmp, &g->duration, &g->start);
- k = fc->fcp->data_slices_per_group + fc->num_extra_slices;
set_group_timing(fc, g);
g->first_chunk += g->num_chunks;
g->num++;
}
+ slice_bytes = fc->mps - FEC_HEADER_SIZE;
+ PARA_CRIT_LOG("slice_bytes: %d\n", slice_bytes);
+ k = fc->fcp->data_slices_per_group + fc->num_extra_slices;
+ n = fc->fcp->slices_per_group + fc->num_extra_slices;
+ PARA_CRIT_LOG("k: %d, n: %d\n", k, n);
if (need_audio_header(fc, vsst)) {
- ret = num_slices(vsst->header_len, fc, &g->num_header_slices);
+ ret = num_slices(vsst->header_len, slice_bytes, n - k);
if (ret < 0)
return ret;
+ g->num_header_slices = ret;
} else
g->num_header_slices = 0;
afh_get_chunk(g->first_chunk, &mmd->afd.afhi, vsst->map, &start_buf,
static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
{
- if (fc->first_stream_chunk < 0 || fc->current_slice_num
+ if (fc->state == FEC_STATE_NONE || fc->current_slice_num
== fc->fcp->slices_per_group + fc->num_extra_slices) {
int ret = setup_next_fec_group(fc, vsst);
if (ret == 0)
}
write_fec_header(fc, vsst);
fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE,
- fc->current_slice_num,
- fc->fcp->max_slice_bytes - FEC_HEADER_SIZE);
+ fc->current_slice_num, fc->mps - FEC_HEADER_SIZE);
return 1;
}
struct timeval tmp, next;
int ret;
- if (fc->first_stream_chunk < 0)
+ if (fc->state == FEC_STATE_NONE)
return 1;
tv_scale(fc->current_slice_num, &fc->group.slice_duration, &tmp);
tv_add(&tmp, &fc->group.start, &next);
mmd->events++;
}
-/**
- * Get the list of all supported audio formats.
- *
- * \return Aa space separated list of all supported audio formats
- * It is not allocated at runtime, i.e. there is no need to free
- * the returned string in the caller.
- */
-const char *supported_audio_formats(void)
-{
- return SUPPORTED_AUDIO_FORMATS;
-}
-
static int need_to_request_new_audio_file(struct vss_task *vsst)
{
struct timeval diff;
static void vss_pre_select(struct sched *s, struct task *t)
{
int i;
- struct timeval *tv, diff;
+ struct timeval *tv;
struct vss_task *vsst = container_of(t, struct vss_task, task);
if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
for (i = 0; senders[i].name; i++)
if (senders[i].shutdown_clients)
senders[i].shutdown_clients();
- list_for_each_entry_safe(fc, tmp, &fec_client_list, node) {
- fc->first_stream_chunk = -1;
+ list_for_each_entry_safe(fc, tmp, &fec_client_list, node)
fc->state = FEC_STATE_NONE;
- }
mmd->stream_start.tv_sec = 0;
mmd->stream_start.tv_usec = 0;
}
senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds);
}
tv = vss_compute_timeout(vsst);
- if (tv && tv_diff(tv, &s->timeout, &diff) < 0)
- s->timeout = *tv;
+ if (tv)
+ sched_request_timeout(tv, s);
}
static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
mmd->new_vss_status_flags = VSS_NEXT;
}
-static int initialize_fec_client(struct fec_client *fc)
-{
- int ret;
- struct fec_client_parms *fcp = fc->fcp;
-
- if (fcp->init_fec) {
- /*
- * Set the maximum slice size to the Maximum Packet Size if the
- * transport protocol allows to determine this value. The user
- * can specify a slice size up to this value.
- */
- ret = fcp->init_fec(fc->sc);
- if (ret < 0)
- return ret;
- if (!fcp->max_slice_bytes || fcp->max_slice_bytes > ret)
- fcp->max_slice_bytes = ret;
- }
- if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
- return -ERRNO_TO_PARA_ERROR(EINVAL);
- ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
- &fc->parms);
- if (ret < 0)
- goto err;
- fc->first_stream_chunk = -1; /* stream not yet started */
- fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
- fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
- fc->num_extra_slices = 0;
- fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
- fc->next_header_time.tv_sec = 0;
- fc->state = FEC_STATE_READY_TO_RUN;
- return 1;
-err:
- fec_free(fc->parms);
- return ret;
-}
-
/**
* Main sending function.
*
*/
static void vss_send(struct vss_task *vsst)
{
- int ret, i, fec_active = 0;
+ int i, fec_active = 0;
struct timeval due;
struct fec_client *fc, *tmp_fc;
&due, 1) < 0)
return;
list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
- switch (fc->state) {
- case FEC_STATE_DISABLED:
+ if (fc->state == FEC_STATE_DISABLED)
continue;
- case FEC_STATE_NONE:
- ret = initialize_fec_client(fc);
- if (ret < 0) {
- PARA_ERROR_LOG("%s\n", para_strerror(-ret));
- continue;
- }
- /* fall through */
- case FEC_STATE_READY_TO_RUN:
- break;
- }
if (!next_slice_is_due(fc, NULL)) {
fec_active = 1;
continue;
if (compute_next_fec_slice(fc, vsst) <= 0)
continue;
PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num,
- fc->current_slice_num, fc->fcp->max_slice_bytes);
- fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf,
- fc->fcp->max_slice_bytes);
+ fc->current_slice_num, fc->mps);
+ fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf, fc->mps);
fc->current_slice_num++;
fec_active = 1;
}
int num = mmd->sender_cmd_data.cmd_num,
sender_num = mmd->sender_cmd_data.sender_num;
- if (senders[sender_num].client_cmds[num])
- senders[sender_num].client_cmds[num](&mmd->sender_cmd_data);
+ if (senders[sender_num].client_cmds[num]) {
+ ret = senders[sender_num].client_cmds[num]
+ (&mmd->sender_cmd_data);
+ if (ret < 0)
+ PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+ }
mmd->sender_cmd_data.cmd_num = -1;
}
if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)