uint8_t num_header_slices;
};
+enum fec_client_state {
+ FEC_STATE_NONE = 0, /**< not initialized and not enabled */
+ FEC_STATE_DISABLED, /**< temporarily disabled */
+ FEC_STATE_READY_TO_RUN /**< initialized and enabled */
+};
+
/**
* Describes one connected FEC client.
*/
struct fec_client {
- /** If negative, this client is temporarily disabled. */
- int error;
- /** Whether the sender client is ready to push out data. */
- bool ready;
+ /** Current state of the client */
+ enum fec_client_state state;
/** The connected sender client (transport layer). */
struct sender_client *sc;
/** Parameters requested by the client. */
int num_extra_slices;
/** Contains the FEC-encoded data. */
unsigned char *enc_buf;
- /** Pointer obtained from sender when the client is added. */
- void *private_data;
};
/**
return 1;
}
-static int num_slices(long unsigned bytes, struct fec_client *fc, uint8_t *result)
+static int num_slices(long unsigned bytes, struct fec_client *fc, int rs)
{
- unsigned long m = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE;
- unsigned rv, redundant_slices = fc->fcp->slices_per_group
- - fc->fcp->data_slices_per_group;
+ int m = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE;
+ int ret;
- if (!m)
- return -E_BAD_CT;
- rv = (bytes + m - 1) / m;
- if (rv + redundant_slices > 255)
+ assert(m > 0);
+ assert(rs > 0);
+ ret = (bytes + m - 1) / m;
+ if (ret + rs > 255)
return -E_BAD_CT;
- *result = rv;
- return 1;
+ return ret;
}
/* set group start and group duration */
static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
{
int ret, i, k, data_slices;
+ int hs, ds, rs; /* header/data/redundant slices */
size_t len;
const char *buf, *start_buf;
struct fec_group *g = &fc->group;
unsigned slice_bytes = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE;
uint32_t max_data_size;
+ rs = fc->fcp->slices_per_group - fc->fcp->data_slices_per_group;
if (fc->first_stream_chunk < 0) {
- uint8_t hs, ds; /* needed header/data slices */
- uint8_t rs = fc->fcp->slices_per_group
- - fc->fcp->data_slices_per_group; /* redundant slices */
int n;
- ret = num_slices(vsst->header_len, fc, &hs);
+ ret = num_slices(vsst->header_len, fc, rs);
if (ret < 0)
return ret;
+ hs = ret;
ret = num_slices(afh_get_largest_chunk_size(&mmd->afd.afhi),
- fc, &ds);
+ fc, rs);
if (ret < 0)
return ret;
- k = (int)hs + ds;
+ ds = ret;
+ k = hs + ds;
if (k > 255)
return -E_BAD_CT;
if (k < fc->fcp->data_slices_per_group)
g->num++;
}
if (need_audio_header(fc, vsst)) {
- ret = num_slices(vsst->header_len, fc, &g->num_header_slices);
+ ret = num_slices(vsst->header_len, fc, rs);
if (ret < 0)
return ret;
+ g->num_header_slices = ret;
} else
g->num_header_slices = 0;
afh_get_chunk(g->first_chunk, &mmd->afd.afhi, vsst->map, &start_buf,
static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
{
- assert(fc->error >= 0);
if (fc->first_stream_chunk < 0 || fc->current_slice_num
== fc->fcp->slices_per_group + fc->num_extra_slices) {
int ret = setup_next_fec_group(fc, vsst);
if (ret < 0) {
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
PARA_ERROR_LOG("FEC client temporarily disabled\n");
- fc->error = ret;
- return fc->error;
+ fc->state = FEC_STATE_DISABLED;
+ return ret;
}
}
write_fec_header(fc, vsst);
{
struct fec_client *fc;
- assert(vss_playing());
list_for_each_entry(fc, &fec_client_list, node) {
struct timeval diff;
- if (fc->error < 0)
+ if (fc->state != FEC_STATE_READY_TO_RUN)
continue;
if (next_slice_is_due(fc, &diff)) {
timeout->tv_sec = 0;
list_for_each_entry(fc, &fec_client_list, node) {
struct timeval group_duration;
- if (fc->error < 0)
+ if (fc->state != FEC_STATE_READY_TO_RUN)
continue;
tv_scale(fc->group.num_chunks, chunk_tv, &group_duration);
if (tv_diff(&timeout, &group_duration, NULL) < 0)
mmd->events++;
}
-/**
- * Get the list of all supported audio formats.
- *
- * \return Aa space separated list of all supported audio formats
- * It is not allocated at runtime, i.e. there is no need to free
- * the returned string in the caller.
- */
-const char *supported_audio_formats(void)
-{
- return SUPPORTED_AUDIO_FORMATS;
-}
-
static int need_to_request_new_audio_file(struct vss_task *vsst)
{
struct timeval diff;
static void vss_pre_select(struct sched *s, struct task *t)
{
int i;
- struct timeval *tv, diff;
+ struct timeval *tv;
struct vss_task *vsst = container_of(t, struct vss_task, task);
if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
senders[i].shutdown_clients();
list_for_each_entry_safe(fc, tmp, &fec_client_list, node) {
fc->first_stream_chunk = -1;
- fc->error = 0;
+ fc->state = FEC_STATE_NONE;
}
mmd->stream_start.tv_sec = 0;
mmd->stream_start.tv_usec = 0;
senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds);
}
tv = vss_compute_timeout(vsst);
- if (tv && tv_diff(tv, &s->timeout, &diff) < 0)
- s->timeout = *tv;
+ if (tv)
+ sched_request_timeout(tv, s);
}
static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
fc->num_extra_slices = 0;
fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
fc->next_header_time.tv_sec = 0;
- fc->ready = true;
+ fc->state = FEC_STATE_READY_TO_RUN;
return 1;
err:
fec_free(fc->parms);
&due, 1) < 0)
return;
list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
- if (fc->error < 0)
+ switch (fc->state) {
+ case FEC_STATE_DISABLED:
continue;
- if (!fc->ready) {
+ case FEC_STATE_NONE:
ret = initialize_fec_client(fc);
if (ret < 0) {
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
continue;
}
+ /* fall through */
+ case FEC_STATE_READY_TO_RUN:
+ break;
}
if (!next_slice_is_due(fc, NULL)) {
fec_active = 1;
int num = mmd->sender_cmd_data.cmd_num,
sender_num = mmd->sender_cmd_data.sender_num;
- if (senders[sender_num].client_cmds[num])
- senders[sender_num].client_cmds[num](&mmd->sender_cmd_data);
+ if (senders[sender_num].client_cmds[num]) {
+ ret = senders[sender_num].client_cmds[num]
+ (&mmd->sender_cmd_data);
+ if (ret < 0)
+ PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+ }
mmd->sender_cmd_data.cmd_num = -1;
}
if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)