#include "net.h"
#include "server.h"
#include "list.h"
-#include "send.h"
#include "sched.h"
+#include "send.h"
#include "vss.h"
#include "ipc.h"
#include "fd.h"
enum afs_socket_status {
/** Socket is inactive. */
AFS_SOCKET_READY,
- /** Socket fd was included in the write fd set for select(). */
+ /** Socket fd was monitored for writing. */
AFS_SOCKET_CHECK_FOR_WRITE,
/** vss wrote a request to the socket and waits for reply from afs. */
AFS_SOCKET_AFD_PENDING
ret = fec_new(k, n, &fc->parms);
if (ret < 0)
return ret;
- fc->src_data = para_malloc(k * sizeof(char *));
+ fc->src_data = arr_alloc(k, sizeof(char *));
for (i = 0; i < k; i++)
- fc->src_data[i] = para_malloc(fc->mps);
- fc->enc_buf = para_malloc(fc->mps);
+ fc->src_data[i] = alloc(fc->mps);
+ fc->enc_buf = alloc(fc->mps);
fc->state = FEC_STATE_READY_TO_RUN;
fc->next_header_time.tv_sec = 0;
}
static int vss_get_chunk(int chunk_num, struct vss_task *vsst,
- char **buf, size_t *sz)
+ char **buf, uint32_t *len)
{
int ret;
if (chunk_num == 0 && vsst->header_len > 0) {
assert(vsst->header_buf);
*buf = vsst->header_buf; /* stripped header */
- *sz = vsst->header_len;
+ *len = vsst->header_len;
return 0;
}
ret = afh_get_chunk(chunk_num, &mmd->afd.afhi,
mmd->afd.audio_format_id, vsst->map, vsst->mapsize,
- (const char **)buf, sz, &vsst->afh_context);
+ (const char **)buf, len, &vsst->afh_context);
if (ret < 0) {
*buf = NULL;
- *sz = 0;
+ *len = 0;
}
return ret;
}
int max_bytes)
{
char *buf;
- size_t len;
+ uint32_t len;
int ret, i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time()));
if (g->first_chunk == 0) {
slice_copied = 0;
for (c = g->first_chunk; c < g->first_chunk + g->num_chunks; c++) {
char *buf;
- size_t src_len;
+ uint32_t src_len;
ret = vss_get_chunk(c, vsst, &buf, &src_len);
if (ret < 0)
return ret;
struct fec_client *vss_add_fec_client(struct sender_client *sc,
struct fec_client_parms *fcp)
{
- struct fec_client *fc = para_calloc(sizeof(*fc));
+ struct fec_client *fc = zalloc(sizeof(*fc));
fc->sc = sc;
fc->fcp = fcp;
if (sched_request_barrier(&vsst->data_send_barrier, s) == 1)
return;
/*
- * Compute the select timeout as the minimal time until the next
+ * Compute the I/O timeout as the minimal time until the next
* chunk/slice is due for any client.
*/
compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
static void vss_eof(struct vss_task *vsst)
{
-
if (!vsst->map)
return;
if (mmd->new_vss_status_flags & VSS_NOMORE)
para_munmap(vsst->map, vsst->mapsize);
vsst->map = NULL;
mmd->chunks_sent = 0;
- //mmd->offset = 0;
mmd->afd.afhi.seconds_total = 0;
+ mmd->afd.afhi.chunks_total = 0;
mmd->afd.afhi.chunk_tv.tv_sec = 0;
mmd->afd.afhi.chunk_tv.tv_usec = 0;
free(mmd->afd.afhi.chunk_table);
mmd->offset = tv2ms(&offset);
}
-static void vss_pre_select(struct sched *s, void *context)
+static void vss_pre_monitor(struct sched *s, void *context)
{
int i;
struct vss_task *vsst = context;
if (need_to_request_new_audio_file(vsst)) {
PARA_DEBUG_LOG("ready and playing, but no audio file\n");
- para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno);
+ sched_monitor_writefd(vsst->afs_socket, s);
vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE;
} else
- para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno);
+ sched_monitor_readfd(vsst->afs_socket, s);
FOR_EACH_SENDER(i) {
- if (!senders[i]->pre_select)
+ if (!senders[i]->pre_monitor)
continue;
- senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds);
+ senders[i]->pre_monitor(s);
}
vss_compute_timeout(s, vsst);
}
#define MAP_POPULATE 0
#endif
-static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
+static void recv_afs_result(struct vss_task *vsst, const struct sched *s)
{
int ret, passed_fd, shmid;
uint32_t afs_code = 0, afs_data = 0;
struct stat statbuf;
- if (!FD_ISSET(vsst->afs_socket, rfds))
+ if (!sched_read_ok(vsst->afs_socket, s))
return;
ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data);
if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN))
/**
* Main sending function.
*
- * This function gets called from vss_post_select(). It checks whether the next
+ * This function gets called from vss_post_monitor(). It checks whether the next
* chunk of data should be pushed out. It obtains a pointer to the data to be
* sent out as well as its length from mmd->afd.afhi. This information is then
* passed to each supported sender's send() function as well as to the send()
struct timeval due;
struct fec_client *fc, *tmp_fc;
char *buf;
- size_t len;
+ uint32_t len;
if (!vsst->map || !vss_playing())
return;
mmd->current_chunk++;
}
-static int vss_post_select(struct sched *s, void *context)
+static int vss_post_monitor(struct sched *s, void *context)
{
int ret, i;
struct vss_task *vsst = context;
mmd->sender_cmd_data.cmd_num = -1;
}
if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)
- recv_afs_result(vsst, &s->rfds);
- else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
- PARA_NOTICE_LOG("requesting new fd from afs\n");
+ recv_afs_result(vsst, s);
+ else if (sched_write_ok(vsst->afs_socket, s)) {
+ PARA_INFO_LOG("requesting new fd from afs\n");
ret = write_buffer(vsst->afs_socket, "new");
if (ret < 0)
PARA_CRIT_LOG("%s\n", para_strerror(-ret));
vsst->afsss = AFS_SOCKET_AFD_PENDING;
}
FOR_EACH_SENDER(i) {
- if (!senders[i]->post_select)
+ if (!senders[i]->post_monitor)
continue;
- senders[i]->post_select(&s->rfds, &s->wfds);
+ senders[i]->post_monitor(s);
}
if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) ||
(vss_next() && vss_playing()))
vsst->afs_socket = afs_socket;
ms2tv(announce_time, &vsst->announce_tv);
PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv));
- INIT_LIST_HEAD(&fec_client_list);
+ init_list_head(&fec_client_list);
FOR_EACH_SENDER(i) {
PARA_NOTICE_LOG("initializing %s sender\n", senders[i]->name);
senders[i]->init();
}
vsst->task = task_register(&(struct task_info) {
.name = "vss",
- .pre_select = vss_pre_select,
- .post_select = vss_post_select,
+ .pre_monitor = vss_pre_monitor,
+ .post_monitor = vss_post_monitor,
.context = vsst,
}, s);
}
void vss_shutdown(void)
{
int i;
+ bool is_command_handler = process_is_command_handler();
FOR_EACH_SENDER(i) {
if (!senders[i]->shutdown)
continue;
- PARA_NOTICE_LOG("shutting down %s sender\n", senders[i]->name);
+ if (!is_command_handler)
+ PARA_NOTICE_LOG("shutting down %s sender\n",
+ senders[i]->name);
senders[i]->shutdown();
}
}