X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=023b2c1d3cc8fcc0dcf23f0b5d2fc5c43487012f;hp=f9bf57b5575a7348fa5b5bb8a2db3c446d9dd92c;hb=refs%2Fheads%2Fpu;hpb=5d72d5880a6bd319a8ce2bf7dd2e4921938c77c2 diff --git a/vss.c b/vss.c index f9bf57b5..cd55851c 100644 --- a/vss.c +++ b/vss.c @@ -28,8 +28,8 @@ #include "net.h" #include "server.h" #include "list.h" -#include "send.h" #include "sched.h" +#include "send.h" #include "vss.h" #include "ipc.h" #include "fd.h" @@ -43,7 +43,7 @@ const struct sender * const senders[] = { enum afs_socket_status { /** Socket is inactive. */ AFS_SOCKET_READY, - /** Socket fd was included in the write fd set for select(). */ + /** Socket fd was monitored for writing. */ AFS_SOCKET_CHECK_FOR_WRITE, /** vss wrote a request to the socket and waits for reply from afs. */ AFS_SOCKET_AFD_PENDING @@ -335,10 +335,10 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) ret = fec_new(k, n, &fc->parms); if (ret < 0) return ret; - fc->src_data = para_malloc(k * sizeof(char *)); + fc->src_data = arr_alloc(k, sizeof(char *)); for (i = 0; i < k; i++) - fc->src_data[i] = para_malloc(fc->mps); - fc->enc_buf = para_malloc(fc->mps); + fc->src_data[i] = alloc(fc->mps); + fc->enc_buf = alloc(fc->mps); fc->state = FEC_STATE_READY_TO_RUN; fc->next_header_time.tv_sec = 0; @@ -671,7 +671,7 @@ size_t vss_get_fec_eof_packet(const char **buf) struct fec_client *vss_add_fec_client(struct sender_client *sc, struct fec_client_parms *fcp) { - struct fec_client *fc = para_calloc(sizeof(*fc)); + struct fec_client *fc = zalloc(sizeof(*fc)); fc->sc = sc; fc->fcp = fcp; @@ -827,7 +827,7 @@ static void vss_compute_timeout(struct sched *s, struct vss_task *vsst) if (sched_request_barrier(&vsst->data_send_barrier, s) == 1) return; /* - * Compute the select timeout as the minimal time until the next + * Compute the I/O timeout as the minimal time until the next * chunk/slice is due for any client. */ compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, @@ -892,21 +892,21 @@ static void set_mmd_offset(void) mmd->offset = tv2ms(&offset); } -static void vss_pre_select(struct sched *s, void *context) +static void vss_pre_monitor(struct sched *s, void *context) { int i; struct vss_task *vsst = context; if (need_to_request_new_audio_file(vsst)) { PARA_DEBUG_LOG("ready and playing, but no audio file\n"); - para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno); + sched_monitor_writefd(vsst->afs_socket, s); vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; } else - para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno); + sched_monitor_readfd(vsst->afs_socket, s); FOR_EACH_SENDER(i) { - if (!senders[i]->pre_select) + if (!senders[i]->pre_monitor) continue; - senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds); + senders[i]->pre_monitor(s); } vss_compute_timeout(s, vsst); } @@ -950,13 +950,13 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) #define MAP_POPULATE 0 #endif -static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) +static void recv_afs_result(struct vss_task *vsst, const struct sched *s) { int ret, passed_fd, shmid; uint32_t afs_code = 0, afs_data = 0; struct stat statbuf; - if (!FD_ISSET(vsst->afs_socket, rfds)) + if (!sched_read_ok(vsst->afs_socket, s)) return; ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data); if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN)) @@ -1014,13 +1014,8 @@ err: } /** - * Main sending function. - * - * This function gets called from vss_post_select(). It checks whether the next - * chunk of data should be pushed out. It obtains a pointer to the data to be - * sent out as well as its length from mmd->afd.afhi. This information is then - * passed to each supported sender's send() function as well as to the send() - * functions of each registered fec client. + * If the next chunk needs to be sent, pass a pointer to the chunk data to all + * registered fec clients and to each sender's ->send() method. */ static void vss_send(struct vss_task *vsst) { @@ -1087,7 +1082,7 @@ static void vss_send(struct vss_task *vsst) mmd->current_chunk++; } -static int vss_post_select(struct sched *s, void *context) +static int vss_post_monitor(struct sched *s, void *context) { int ret, i; struct vss_task *vsst = context; @@ -1137,8 +1132,8 @@ static int vss_post_select(struct sched *s, void *context) mmd->sender_cmd_data.cmd_num = -1; } if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) - recv_afs_result(vsst, &s->rfds); - else if (FD_ISSET(vsst->afs_socket, &s->wfds)) { + recv_afs_result(vsst, s); + else if (sched_write_ok(vsst->afs_socket, s)) { PARA_INFO_LOG("requesting new fd from afs\n"); ret = write_buffer(vsst->afs_socket, "new"); if (ret < 0) @@ -1147,9 +1142,9 @@ static int vss_post_select(struct sched *s, void *context) vsst->afsss = AFS_SOCKET_AFD_PENDING; } FOR_EACH_SENDER(i) { - if (!senders[i]->post_select) + if (!senders[i]->post_monitor) continue; - senders[i]->post_select(&s->rfds, &s->wfds); + senders[i]->post_monitor(s); } if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) @@ -1194,8 +1189,8 @@ void vss_init(int afs_socket, struct sched *s) } vsst->task = task_register(&(struct task_info) { .name = "vss", - .pre_select = vss_pre_select, - .post_select = vss_post_select, + .pre_monitor = vss_pre_monitor, + .post_monitor = vss_post_monitor, .context = vsst, }, s); }