X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=625fabe766bb7707ac2a35125f7af0ce250f1cae;hp=cbc05d131b28550d8596cee1bb505a835ef81779;hb=71c6743e00d5741dc8eb60594575339543be0bcf;hpb=49f9b08933774469ffb92aba24ffe5da442b789a diff --git a/vss.c b/vss.c index cbc05d13..625fabe7 100644 --- a/vss.c +++ b/vss.c @@ -19,7 +19,9 @@ #include #include #include +#include +#include "server.lsg.h" #include "para.h" #include "error.h" #include "portable_io.h" @@ -29,7 +31,6 @@ #include "afs.h" #include "server.h" #include "net.h" -#include "server.cmdline.h" #include "list.h" #include "send.h" #include "sched.h" @@ -88,6 +89,8 @@ struct vss_task { enum afs_socket_status afsss; /** The memory mapped audio file. */ char *map; + /** The size of the memory mapping. */ + size_t mapsize; /** Used by the scheduler. */ struct task *task; /** Pointer to the header of the mapped audio file. */ @@ -96,6 +99,8 @@ struct vss_task { size_t header_len; /** Time between audio file headers are sent. */ struct timeval header_interval; + /* Only used if afh supports dynamic chunks. */ + void *afh_context; }; /** @@ -347,6 +352,8 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) static void vss_get_chunk(int chunk_num, struct vss_task *vsst, char **buf, size_t *sz) { + int ret; + /* * Chunk zero is special for header streams: It is the first portion of * the audio file which consists of the audio file header. It may be @@ -356,12 +363,20 @@ static void vss_get_chunk(int chunk_num, struct vss_task *vsst, * rather than the unmodified header (chunk zero). */ if (chunk_num == 0 && vsst->header_len > 0) { + assert(vsst->header_buf); *buf = vsst->header_buf; /* stripped header */ *sz = vsst->header_len; return; } - afh_get_chunk(chunk_num, &mmd->afd.afhi, vsst->map, (const char **)buf, - sz); + ret = afh_get_chunk(chunk_num, &mmd->afd.afhi, + mmd->afd.audio_format_id, vsst->map, vsst->mapsize, + (const char **)buf, sz, &vsst->afh_context); + if (ret < 0) { + PARA_WARNING_LOG("could not get chunk %d: %s\n", + chunk_num, para_strerror(-ret)); + *buf = NULL; + *sz = 0; + } } static void compute_group_size(struct vss_task *vsst, struct fec_group *g, @@ -611,7 +626,7 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) for (; i < k; i++) fc->src_data[i] = (const unsigned char *)buf; } - PARA_DEBUG_LOG("FEC group %d: %d chunks (%d - %d), %d bytes\n", + PARA_DEBUG_LOG("FEC group %u: %u chunks (%u - %u), %u bytes\n", g->num, g->num_chunks, g->first_chunk, g->first_chunk + g->num_chunks - 1, g->bytes ); @@ -846,7 +861,7 @@ static void vss_eof(struct vss_task *vsst) set_eof_barrier(vsst); afh_free_header(vsst->header_buf, mmd->afd.audio_format_id); vsst->header_buf = NULL; - para_munmap(vsst->map, mmd->size); + para_munmap(vsst->map, vsst->mapsize); vsst->map = NULL; mmd->chunks_sent = 0; //mmd->offset = 0; @@ -855,7 +870,9 @@ static void vss_eof(struct vss_task *vsst) mmd->afd.afhi.chunk_tv.tv_usec = 0; free(mmd->afd.afhi.chunk_table); mmd->afd.afhi.chunk_table = NULL; - mmd->size = 0; + vsst->mapsize = 0; + afh_close(vsst->afh_context, mmd->afd.audio_format_id); + vsst->afh_context = NULL; mmd->events++; } @@ -905,7 +922,7 @@ static void vss_pre_select(struct sched *s, void *context) static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) { - char control[255], buf[8]; + char control[255] __a_aligned(8), buf[8]; struct msghdr msg = {.msg_iov = NULL}; struct cmsghdr *cmsg; struct iovec iov; @@ -973,11 +990,11 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) ret = -ERRNO_TO_PARA_ERROR(errno); goto err; } - mmd->size = statbuf.st_size; - ret = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE | MAP_POPULATE, + ret = para_mmap(statbuf.st_size, PROT_READ, MAP_PRIVATE | MAP_POPULATE, passed_fd, 0, &vsst->map); if (ret < 0) goto err; + vsst->mapsize = statbuf.st_size; close(passed_fd); mmd->chunks_sent = 0; mmd->current_chunk = 0; @@ -986,7 +1003,7 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) mmd->num_played++; mmd->new_vss_status_flags &= (~VSS_NEXT); afh_get_header(&mmd->afd.afhi, mmd->afd.audio_format_id, - vsst->map, mmd->size, &vsst->header_buf, &vsst->header_len); + vsst->map, vsst->mapsize, &vsst->header_buf, &vsst->header_len); return; err: free(mmd->afd.afhi.chunk_table); @@ -1015,8 +1032,7 @@ static void vss_send(struct vss_task *vsst) return; if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0) return; - if (chk_barrier("data send", &vsst->data_send_barrier, - &due, 1) < 0) + if (chk_barrier("data send", &vsst->data_send_barrier, &due, 1) < 0) return; list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { if (fc->state == FEC_STATE_DISABLED) @@ -1027,11 +1043,11 @@ static void vss_send(struct vss_task *vsst) } if (compute_next_fec_slice(fc, vsst) <= 0) continue; - PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num, + PARA_DEBUG_LOG("sending %u:%u (%u bytes)\n", fc->group.num, fc->current_slice_num, fc->group.slice_bytes); + fc->current_slice_num++; fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf, fc->group.slice_bytes + FEC_HEADER_SIZE); - fc->current_slice_num++; fec_active = 1; } if (mmd->current_chunk >= mmd->afd.afhi.chunks_total) { /* eof */ @@ -1050,13 +1066,12 @@ static void vss_send(struct vss_task *vsst) mmd->events++; set_mmd_offset(); } - /* - * We call the send function also in case of empty chunks as - * they might have still some data queued which can be sent in - * this case. - */ vss_get_chunk(mmd->current_chunk, vsst, &buf, &len); for (i = 0; senders[i].name; i++) { + /* + * We call ->send() even if len is zero because senders + * might have data queued which can be sent now. + */ if (!senders[i].send) continue; senders[i].send(mmd->current_chunk, mmd->chunks_sent, @@ -1076,7 +1091,7 @@ static void vss_send(struct vss_task *vsst) */ if (mmd->current_chunk > 0) { /* chunk 0 might be on the heap */ buf += len; - for (i = 0; i < 5 && buf < vsst->map + mmd->size; i++) { + for (i = 0; i < 5 && buf < vsst->map + vsst->mapsize; i++) { __a_unused volatile char x = *buf; buf += 4096; } @@ -1113,7 +1128,7 @@ static int vss_post_select(struct sched *s, void *context) set_eof_barrier(vsst); mmd->chunks_sent = 0; mmd->current_chunk = afh_get_start_chunk(mmd->repos_request, - &mmd->afd.afhi); + &mmd->afd.afhi, mmd->afd.audio_format_id); mmd->new_vss_status_flags &= ~VSS_REPOS; set_mmd_offset(); } @@ -1166,10 +1181,8 @@ void init_vss_task(int afs_socket, struct sched *s) static struct vss_task vss_task_struct, *vsst = &vss_task_struct; int i; char *hn = para_hostname(), *home = para_homedir(); - long unsigned announce_time = conf.announce_time_arg > 0? - conf.announce_time_arg : 300, - autoplay_delay = conf.autoplay_delay_arg > 0? - conf.autoplay_delay_arg : 0; + long unsigned announce_time = OPT_UINT32_VAL(ANNOUNCE_TIME), + autoplay_delay = OPT_UINT32_VAL(AUTOPLAY_DELAY); vsst->header_interval.tv_sec = 5; /* should this be configurable? */ vsst->afs_socket = afs_socket; ms2tv(announce_time, &vsst->announce_tv); @@ -1182,12 +1195,12 @@ void init_vss_task(int afs_socket, struct sched *s) free(hn); free(home); mmd->sender_cmd_data.cmd_num = -1; - if (conf.autoplay_given) { + if (OPT_GIVEN(AUTOPLAY)) { struct timeval tmp; mmd->vss_status_flags |= VSS_PLAYING; mmd->new_vss_status_flags |= VSS_PLAYING; ms2tv(autoplay_delay, &tmp); - tv_add(now, &tmp, &vsst->autoplay_barrier); + tv_add(clock_get_realtime(NULL), &tmp, &vsst->autoplay_barrier); tv_add(&vsst->autoplay_barrier, &vsst->announce_tv, &vsst->data_send_barrier); }