X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=023b2c1d3cc8fcc0dcf23f0b5d2fc5c43487012f;hp=b73562f9733a98e1bea760ea109f1a7cf883d426;hb=HEAD;hpb=96c1602f28d5ef601bad10941cf079686fc67dfd diff --git a/vss.c b/vss.c index b73562f9..cd55851c 100644 --- a/vss.c +++ b/vss.c @@ -1,8 +1,4 @@ -/* - * Copyright (C) 1997 Andre Noll - * - * Licensed under the GPL v2. For licencing details see COPYING. - */ +/* Copyright (C) 1997 Andre Noll , see file COPYING. */ /** \file vss.c The virtual streaming system. * @@ -29,45 +25,25 @@ #include "string.h" #include "afh.h" #include "afs.h" -#include "server.h" #include "net.h" +#include "server.h" #include "list.h" -#include "send.h" #include "sched.h" +#include "send.h" #include "vss.h" #include "ipc.h" #include "fd.h" extern struct misc_meta_data *mmd; - -extern void dccp_send_init(struct sender *); -extern void http_send_init(struct sender *); -extern void udp_send_init(struct sender *); - -/** The list of supported senders. */ -struct sender senders[] = { - { - .name = "http", - .init = http_send_init, - }, - { - .name = "dccp", - .init = dccp_send_init, - }, - { - .name = "udp", - .init = udp_send_init, - }, - { - .name = NULL, - } -}; +extern const struct sender udp_sender, dccp_sender, http_sender; +const struct sender * const senders[] = { + &http_sender, &dccp_sender, &udp_sender, NULL}; /** The possible states of the afs socket. */ enum afs_socket_status { /** Socket is inactive. */ AFS_SOCKET_READY, - /** Socket fd was included in the write fd set for select(). */ + /** Socket fd was monitored for writing. */ AFS_SOCKET_CHECK_FOR_WRITE, /** vss wrote a request to the socket and waits for reply from afs. */ AFS_SOCKET_AFD_PENDING @@ -172,14 +148,10 @@ struct fec_client { struct fec_group group; /** The current slice. */ uint8_t current_slice_num; - /** The data to be FEC-encoded (point to a region within the mapped audio file). */ - const unsigned char **src_data; + /** The data to be FEC-encoded. */ + unsigned char **src_data; /** Last time an audio header was sent. */ struct timeval next_header_time; - /** Used for the last source pointer of an audio file. */ - unsigned char *extra_src_buf; - /** Needed for the last slice of the audio file header. */ - unsigned char *extra_header_buf; /** Extra slices needed to store largest chunk + header. */ int num_extra_slices; /** Contains the FEC-encoded data. */ @@ -264,6 +236,21 @@ static bool need_data_slices(struct fec_client *fc, struct vss_task *vsst) return false; } +static int fc_num_data_slices(const struct fec_client *fc) +{ + return fc->fcp->data_slices_per_group + fc->num_extra_slices; +} + +static int fc_num_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group + fc->num_extra_slices; +} + +static int fc_num_redundant_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; +} + static int num_slices(long unsigned bytes, int max_payload, int rs) { int ret; @@ -294,7 +281,7 @@ static void set_group_timing(struct fec_client *fc, struct vss_task *vsst) static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) { - int k, n, ret; + int i, k, n, ret; int hs, ds, rs; /* header/data/redundant slices */ struct fec_client_parms *fcp = fc->fcp; @@ -302,7 +289,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fcp->init_fec) { /* * Set the maximum slice size to the Maximum Packet Size if the - * transport protocol allows to determine this value. The user + * transport protocol allows determination of this value. The user * can specify a slice size up to this value. */ ret = fcp->init_fec(fc->sc); @@ -314,7 +301,17 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fc->mps <= FEC_HEADER_SIZE) return -ERRNO_TO_PARA_ERROR(EINVAL); - rs = fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; + /* free previous buffers, if any */ + if (fc->src_data) { + k = fc_num_data_slices(fc); + for (i = 0; i < k; i++) + free(fc->src_data[i]); + free(fc->src_data); + fc->src_data = NULL; + } + free(fc->enc_buf); + + rs = fc_num_redundant_slices(fc); ret = num_slices(vsst->header_len, fc->mps - FEC_HEADER_SIZE, rs); if (ret < 0) return ret; @@ -330,17 +327,18 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (k < fc->fcp->data_slices_per_group) k = fc->fcp->data_slices_per_group; fc->num_extra_slices = k - fc->fcp->data_slices_per_group; - n = k + rs; + n = fc_num_slices(fc); + PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", + fc->mps, k, n, fc->num_extra_slices); + fec_free(fc->parms); ret = fec_new(k, n, &fc->parms); if (ret < 0) return ret; - PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", - fc->mps, k, n, fc->num_extra_slices); - fc->src_data = para_realloc(fc->src_data, k * sizeof(char *)); - fc->enc_buf = para_realloc(fc->enc_buf, fc->mps); - fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->mps); - fc->extra_header_buf = para_realloc(fc->extra_header_buf, fc->mps); + fc->src_data = arr_alloc(k, sizeof(char *)); + for (i = 0; i < k; i++) + fc->src_data[i] = alloc(fc->mps); + fc->enc_buf = alloc(fc->mps); fc->state = FEC_STATE_READY_TO_RUN; fc->next_header_time.tv_sec = 0; @@ -349,8 +347,8 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) return 1; } -static void vss_get_chunk(int chunk_num, struct vss_task *vsst, - char **buf, size_t *sz) +static int vss_get_chunk(int chunk_num, struct vss_task *vsst, + char **buf, uint32_t *len) { int ret; @@ -365,32 +363,33 @@ static void vss_get_chunk(int chunk_num, struct vss_task *vsst, if (chunk_num == 0 && vsst->header_len > 0) { assert(vsst->header_buf); *buf = vsst->header_buf; /* stripped header */ - *sz = vsst->header_len; - return; + *len = vsst->header_len; + return 0; } ret = afh_get_chunk(chunk_num, &mmd->afd.afhi, mmd->afd.audio_format_id, vsst->map, vsst->mapsize, - (const char **)buf, sz, &vsst->afh_context); + (const char **)buf, len, &vsst->afh_context); if (ret < 0) { - PARA_WARNING_LOG("could not get chunk %d: %s\n", - chunk_num, para_strerror(-ret)); *buf = NULL; - *sz = 0; + *len = 0; } + return ret; } -static void compute_group_size(struct vss_task *vsst, struct fec_group *g, +static int compute_group_size(struct vss_task *vsst, struct fec_group *g, int max_bytes) { char *buf; - size_t len; - int i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time())); + uint32_t len; + int ret, i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time())); if (g->first_chunk == 0) { g->num_chunks = 1; - vss_get_chunk(0, vsst, &buf, &len); + ret = vss_get_chunk(0, vsst, &buf, &len); + if (ret < 0) + return ret; g->bytes = len; - return; + return 0; } g->num_chunks = 0; @@ -408,14 +407,20 @@ static void compute_group_size(struct vss_task *vsst, struct fec_group *g, break; if (chunk_num >= mmd->afd.afhi.chunks_total) /* eof */ break; - vss_get_chunk(chunk_num, vsst, &buf, &len); + ret = vss_get_chunk(chunk_num, vsst, &buf, &len); + if (ret < 0) + return ret; if (g->bytes + len > max_bytes) break; /* Include this chunk */ g->bytes += len; g->num_chunks++; } - assert(g->num_chunks); + if (g->num_chunks == 0) + return -E_EOF; + PARA_DEBUG_LOG("group #%u: %u chunks, %u bytes total\n", g->num, + g->num_chunks, g->bytes); + return 1; } /* @@ -468,8 +473,8 @@ static void compute_group_size(struct vss_task *vsst, struct fec_group *g, static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) { struct fec_group *g = &fc->group; - int k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - int n = fc->fcp->slices_per_group + fc->num_extra_slices; + int k = fc_num_data_slices(fc); + int n = fc_num_slices(fc); int ret, k1, k2, h, d, min, max, sum; int max_slice_bytes = fc->mps - FEC_HEADER_SIZE; int max_group_bytes; @@ -477,7 +482,9 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) if (!need_audio_header(fc, vsst)) { max_group_bytes = k * max_slice_bytes; g->num_header_slices = 0; - compute_group_size(vsst, g, max_group_bytes); + ret = compute_group_size(vsst, g, max_group_bytes); + if (ret < 0) + return ret; g->slice_bytes = DIV_ROUND_UP(g->bytes, k); if (g->slice_bytes == 0) g->slice_bytes = 1; @@ -493,7 +500,9 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) h = vsst->header_len; max_group_bytes = (k - num_slices(h, max_slice_bytes, n - k)) * max_slice_bytes; - compute_group_size(vsst, g, max_group_bytes); + ret = compute_group_size(vsst, g, max_group_bytes); + if (ret < 0) + return ret; d = g->bytes; if (d == 0) { g->slice_bytes = DIV_ROUND_UP(h, k); @@ -528,9 +537,8 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) { - int ret, i, k, n, data_slices; - size_t len; - char *buf, *p; + int ret, i, c; + size_t copy, src_copied, slice_copied; struct fec_group *g = &fc->group; if (fc->state == FEC_STATE_NONE) { @@ -554,84 +562,63 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) g->first_chunk += g->num_chunks; g->num++; } - k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - n = fc->fcp->slices_per_group + fc->num_extra_slices; - compute_slice_size(fc, vsst); assert(g->slice_bytes > 0); - ret = num_slices(g->bytes, g->slice_bytes, n - k); - if (ret < 0) - return ret; - data_slices = ret; - assert(g->num_header_slices + data_slices <= k); fc->current_slice_num = 0; if (g->num == 0) set_group_timing(fc, vsst); /* setup header slices */ - buf = vsst->header_buf; - for (i = 0; i < g->num_header_slices; i++) { - uint32_t payload_size; - if (buf + g->slice_bytes <= vsst->header_buf + vsst->header_len) { - fc->src_data[i] = (const unsigned char *)buf; - buf += g->slice_bytes; - continue; - } - /* - * Can not use vss->header_buf for this slice as it - * goes beyond the buffer. This slice will not be fully - * used. - */ - payload_size = vsst->header_buf + vsst->header_len - buf; - memcpy(fc->extra_header_buf, buf, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_header_buf + payload_size, 0, - g->slice_bytes - payload_size); - /* - * There might be more than one header slice to fill although - * only the first one will be used. Set all header slices to - * our extra buffer. - */ - while (i < g->num_header_slices) - fc->src_data[i++] = fc->extra_header_buf; - break; /* we don't want i to be increased. */ + for (i = 0, src_copied = 0; i < g->num_header_slices; i++) { + copy = PARA_MIN((size_t)g->slice_bytes, vsst->header_len - src_copied); + if (copy == 0) + break; + memcpy(fc->src_data[i], vsst->header_buf + src_copied, copy); + if (copy < g->slice_bytes) + memset(fc->src_data[i] + copy, 0, g->slice_bytes - copy); + src_copied += copy; } - /* - * Setup data slices. Note that for ogg streams chunk 0 points to a - * buffer on the heap rather than to the mapped audio file. + * There might be more than one header slice to fill although only the + * first one will be used. Zero out any remaining header slices. */ - vss_get_chunk(g->first_chunk, vsst, &buf, &len); - for (p = buf; i < g->num_header_slices + data_slices; i++) { - if (p + g->slice_bytes > buf + g->bytes) { - /* - * We must make a copy for this slice since using p - * directly would exceed the buffer. - */ - uint32_t payload_size = buf + g->bytes - p; - assert(payload_size + FEC_HEADER_SIZE <= fc->mps); - memcpy(fc->extra_src_buf, p, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_src_buf + payload_size, 0, - g->slice_bytes - payload_size); - fc->src_data[i] = fc->extra_src_buf; - i++; - break; + while (i < g->num_header_slices) + memset(fc->src_data[i++], 0, g->slice_bytes); + + slice_copied = 0; + for (c = g->first_chunk; c < g->first_chunk + g->num_chunks; c++) { + char *buf; + uint32_t src_len; + ret = vss_get_chunk(c, vsst, &buf, &src_len); + if (ret < 0) + return ret; + if (src_len == 0) + continue; + src_copied = 0; + while (src_copied < src_len) { + copy = PARA_MIN((size_t)g->slice_bytes - slice_copied, + src_len - src_copied); + memcpy(fc->src_data[i] + slice_copied, + buf + src_copied, copy); + src_copied += copy; + slice_copied += copy; + if (slice_copied == g->slice_bytes) { + i++; + slice_copied = 0; + } } - fc->src_data[i] = (const unsigned char *)p; - p += g->slice_bytes; - } - if (i < k) { - /* use arbitrary data for all remaining slices */ - buf = vsst->map; - for (; i < k; i++) - fc->src_data[i] = (const unsigned char *)buf; } + if (i < fc_num_data_slices(fc) && slice_copied < g->slice_bytes) + memset(fc->src_data[i] + slice_copied, 0, + g->slice_bytes - slice_copied); + /* zero out remaining slices, if any */ + while (++i < fc_num_data_slices(fc)) + memset(fc->src_data[i], 0, g->slice_bytes); PARA_DEBUG_LOG("FEC group %u: %u chunks (%u - %u), %u bytes\n", g->num, g->num_chunks, g->first_chunk, g->first_chunk + g->num_chunks - 1, g->bytes ); PARA_DEBUG_LOG("slice_bytes: %d, %d header slices, %d data slices\n", - g->slice_bytes, g->num_header_slices, data_slices + g->slice_bytes, g->num_header_slices, fc_num_data_slices(fc) ); return 1; } @@ -651,8 +638,9 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) } } write_fec_header(fc, vsst); - fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE, - fc->current_slice_num, fc->group.slice_bytes); + fec_encode(fc->parms, (const unsigned char * const*)fc->src_data, + fc->enc_buf + FEC_HEADER_SIZE, fc->current_slice_num, + fc->group.slice_bytes); return 1; } @@ -683,7 +671,7 @@ size_t vss_get_fec_eof_packet(const char **buf) struct fec_client *vss_add_fec_client(struct sender_client *sc, struct fec_client_parms *fcp) { - struct fec_client *fc = para_calloc(sizeof(*fc)); + struct fec_client *fc = zalloc(sizeof(*fc)); fc->sc = sc; fc->fcp = fcp; @@ -698,11 +686,15 @@ struct fec_client *vss_add_fec_client(struct sender_client *sc, */ void vss_del_fec_client(struct fec_client *fc) { + int i; + list_del(&fc->node); - free(fc->src_data); free(fc->enc_buf); - free(fc->extra_src_buf); - free(fc->extra_header_buf); + if (fc->src_data) { + for (i = 0; i < fc_num_data_slices(fc); i++) + free(fc->src_data[i]); + free(fc->src_data); + } fec_free(fc->parms); free(fc); } @@ -835,7 +827,7 @@ static void vss_compute_timeout(struct sched *s, struct vss_task *vsst) if (sched_request_barrier(&vsst->data_send_barrier, s) == 1) return; /* - * Compute the select timeout as the minimal time until the next + * Compute the I/O timeout as the minimal time until the next * chunk/slice is due for any client. */ compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, @@ -853,7 +845,6 @@ static void vss_compute_timeout(struct sched *s, struct vss_task *vsst) static void vss_eof(struct vss_task *vsst) { - if (!vsst->map) return; if (mmd->new_vss_status_flags & VSS_NOMORE) @@ -864,8 +855,8 @@ static void vss_eof(struct vss_task *vsst) para_munmap(vsst->map, vsst->mapsize); vsst->map = NULL; mmd->chunks_sent = 0; - //mmd->offset = 0; mmd->afd.afhi.seconds_total = 0; + mmd->afd.afhi.chunks_total = 0; mmd->afd.afhi.chunk_tv.tv_sec = 0; mmd->afd.afhi.chunk_tv.tv_usec = 0; free(mmd->afd.afhi.chunk_table); @@ -901,21 +892,21 @@ static void set_mmd_offset(void) mmd->offset = tv2ms(&offset); } -static void vss_pre_select(struct sched *s, void *context) +static void vss_pre_monitor(struct sched *s, void *context) { int i; struct vss_task *vsst = context; if (need_to_request_new_audio_file(vsst)) { PARA_DEBUG_LOG("ready and playing, but no audio file\n"); - para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno); + sched_monitor_writefd(vsst->afs_socket, s); vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; } else - para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno); - for (i = 0; senders[i].name; i++) { - if (!senders[i].pre_select) + sched_monitor_readfd(vsst->afs_socket, s); + FOR_EACH_SENDER(i) { + if (!senders[i]->pre_monitor) continue; - senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds); + senders[i]->pre_monitor(s); } vss_compute_timeout(s, vsst); } @@ -955,16 +946,17 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) } #ifndef MAP_POPULATE +/** As of 2018, neither FreeBSD-11.2 nor NetBSD-8.0 have MAP_POPULATE. */ #define MAP_POPULATE 0 #endif -static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) +static void recv_afs_result(struct vss_task *vsst, const struct sched *s) { int ret, passed_fd, shmid; uint32_t afs_code = 0, afs_data = 0; struct stat statbuf; - if (!FD_ISSET(vsst->afs_socket, rfds)) + if (!sched_read_ok(vsst->afs_socket, s)) return; ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data); if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN)) @@ -972,11 +964,17 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) if (ret < 0) goto err; vsst->afsss = AFS_SOCKET_READY; - PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code, - afs_data); + if (afs_code == NO_ADMISSIBLE_FILES) { + PARA_NOTICE_LOG("no admissible files\n"); + ret = 0; + goto err; + } ret = -E_NOFD; - if (afs_code != NEXT_AUDIO_FILE) + if (afs_code != NEXT_AUDIO_FILE) { + PARA_ERROR_LOG("afs code: %u, expected: %d\n", afs_code, + NEXT_AUDIO_FILE); goto err; + } if (passed_fd < 0) goto err; shmid = afs_data; @@ -991,7 +989,7 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) goto err; } ret = para_mmap(statbuf.st_size, PROT_READ, MAP_PRIVATE | MAP_POPULATE, - passed_fd, 0, &vsst->map); + passed_fd, &vsst->map); if (ret < 0) goto err; vsst->mapsize = statbuf.st_size; @@ -1007,26 +1005,26 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) return; err: free(mmd->afd.afhi.chunk_table); + mmd->afd.afhi.chunk_table = NULL; if (passed_fd >= 0) close(passed_fd); - PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + if (ret < 0) + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); mmd->new_vss_status_flags = VSS_NEXT; } /** - * Main sending function. - * - * This function gets called from vss_post_select(). It checks whether the next - * chunk of data should be pushed out. It obtains a pointer to the data to be - * sent out as well as its length from mmd->afd.afhi. This information is then - * passed to each supported sender's send() function as well as to the send() - * functions of each registered fec client. + * If the next chunk needs to be sent, pass a pointer to the chunk data to all + * registered fec clients and to each sender's ->send() method. */ static void vss_send(struct vss_task *vsst) { - int i, fec_active = 0; + int i, ret; + bool fec_active = false; struct timeval due; struct fec_client *fc, *tmp_fc; + char *buf; + uint32_t len; if (!vsst->map || !vss_playing()) return; @@ -1038,7 +1036,7 @@ static void vss_send(struct vss_task *vsst) if (fc->state == FEC_STATE_DISABLED) continue; if (!next_slice_is_due(fc, NULL)) { - fec_active = 1; + fec_active = true; continue; } if (compute_next_fec_slice(fc, vsst) <= 0) @@ -1048,7 +1046,7 @@ static void vss_send(struct vss_task *vsst) fc->current_slice_num++; fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf, fc->group.slice_bytes + FEC_HEADER_SIZE); - fec_active = 1; + fec_active = true; } if (mmd->current_chunk >= mmd->afd.afhi.chunks_total) { /* eof */ if (!fec_active) @@ -1057,62 +1055,49 @@ static void vss_send(struct vss_task *vsst) } compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, &mmd->stream_start, &due); - if (tv_diff(&due, now, NULL) <= 0) { - char *buf; - size_t len; - - if (!mmd->chunks_sent) { - mmd->stream_start = *now; - mmd->events++; - set_mmd_offset(); - } + if (tv_diff(&due, now, NULL) > 0) + return; + if (!mmd->chunks_sent) { + mmd->stream_start = *now; + mmd->events++; + set_mmd_offset(); + } + ret = vss_get_chunk(mmd->current_chunk, vsst, &buf, &len); + if (ret < 0) { + PARA_ERROR_LOG("could not get chunk %lu: %s\n", + mmd->current_chunk, para_strerror(-ret)); + } else { /* - * We call the send function also in case of empty chunks as - * they might have still some data queued which can be sent in - * this case. + * We call ->send() even if len is zero because senders might + * have data queued which can be sent now. */ - vss_get_chunk(mmd->current_chunk, vsst, &buf, &len); - for (i = 0; senders[i].name; i++) { - if (!senders[i].send) + FOR_EACH_SENDER(i) { + if (!senders[i]->send) continue; - senders[i].send(mmd->current_chunk, mmd->chunks_sent, + senders[i]->send(mmd->current_chunk, mmd->chunks_sent, buf, len, vsst->header_buf, vsst->header_len); } - /* - * Prefault next chunk(s) - * - * If the backing device of the memory-mapped audio file is - * slow and read-ahead is turned off or prevented for some - * reason, e.g. due to memory pressure, it may take much longer - * than the chunk interval to get the next chunk on the wire, - * causing buffer underruns on the client side. Mapping the - * file with MAP_POPULATE seems to help a bit, but it does not - * eliminate the delays completely. Moreover, it is supported - * only on Linux. So we do our own read-ahead here. - */ - if (mmd->current_chunk > 0) { /* chunk 0 might be on the heap */ - buf += len; - for (i = 0; i < 5 && buf < vsst->map + vsst->mapsize; i++) { - __a_unused volatile char x = *buf; - buf += 4096; - } - } - mmd->chunks_sent++; - mmd->current_chunk++; } + mmd->chunks_sent++; + mmd->current_chunk++; } -static int vss_post_select(struct sched *s, void *context) +static int vss_post_monitor(struct sched *s, void *context) { int ret, i; struct vss_task *vsst = context; + ret = task_get_notification(vsst->task); + if (ret < 0) { + afh_free_header(vsst->header_buf, mmd->afd.audio_format_id); + return ret; + } if (!vsst->map || vss_next() || vss_paused() || vss_repos()) { /* shut down senders and fec clients */ struct fec_client *fc, *tmp; - for (i = 0; senders[i].name; i++) - if (senders[i].shutdown_clients) - senders[i].shutdown_clients(); + FOR_EACH_SENDER(i) + if (senders[i]->shutdown_clients) + senders[i]->shutdown_clients(); list_for_each_entry_safe(fc, tmp, &fec_client_list, node) fc->state = FEC_STATE_NONE; mmd->stream_start.tv_sec = 0; @@ -1138,8 +1123,8 @@ static int vss_post_select(struct sched *s, void *context) int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; - if (senders[sender_num].client_cmds[num]) { - ret = senders[sender_num].client_cmds[num] + if (senders[sender_num]->client_cmds[num]) { + ret = senders[sender_num]->client_cmds[num] (&mmd->sender_cmd_data); if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); @@ -1147,19 +1132,19 @@ static int vss_post_select(struct sched *s, void *context) mmd->sender_cmd_data.cmd_num = -1; } if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) - recv_afs_result(vsst, &s->rfds); - else if (FD_ISSET(vsst->afs_socket, &s->wfds)) { - PARA_NOTICE_LOG("requesting new fd from afs\n"); + recv_afs_result(vsst, s); + else if (sched_write_ok(vsst->afs_socket, s)) { + PARA_INFO_LOG("requesting new fd from afs\n"); ret = write_buffer(vsst->afs_socket, "new"); if (ret < 0) PARA_CRIT_LOG("%s\n", para_strerror(-ret)); else vsst->afsss = AFS_SOCKET_AFD_PENDING; } - for (i = 0; senders[i].name; i++) { - if (!senders[i].post_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->post_monitor) continue; - senders[i].post_select(&s->rfds, &s->wfds); + senders[i]->post_monitor(s); } if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) @@ -1177,24 +1162,21 @@ static int vss_post_select(struct sched *s, void *context) * This also initializes all supported senders and starts streaming * if the --autoplay command line flag was given. */ -void init_vss_task(int afs_socket, struct sched *s) +void vss_init(int afs_socket, struct sched *s) { static struct vss_task vss_task_struct, *vsst = &vss_task_struct; int i; - char *hn = para_hostname(), *home = para_homedir(); long unsigned announce_time = OPT_UINT32_VAL(ANNOUNCE_TIME), autoplay_delay = OPT_UINT32_VAL(AUTOPLAY_DELAY); vsst->header_interval.tv_sec = 5; /* should this be configurable? */ vsst->afs_socket = afs_socket; ms2tv(announce_time, &vsst->announce_tv); PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv)); - INIT_LIST_HEAD(&fec_client_list); - for (i = 0; senders[i].name; i++) { - PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name); - senders[i].init(&senders[i]); + init_list_head(&fec_client_list); + FOR_EACH_SENDER(i) { + PARA_NOTICE_LOG("initializing %s sender\n", senders[i]->name); + senders[i]->init(); } - free(hn); - free(home); mmd->sender_cmd_data.cmd_num = -1; if (OPT_GIVEN(AUTOPLAY)) { struct timeval tmp; @@ -1206,9 +1188,29 @@ void init_vss_task(int afs_socket, struct sched *s) &vsst->data_send_barrier); } vsst->task = task_register(&(struct task_info) { - .name = "vss task", - .pre_select = vss_pre_select, - .post_select = vss_post_select, + .name = "vss", + .pre_monitor = vss_pre_monitor, + .post_monitor = vss_post_monitor, .context = vsst, }, s); } + +/** + * Turn off the virtual streaming system. + * + * This is only executed on exit. It calls the ->shutdown method of all senders. + */ +void vss_shutdown(void) +{ + int i; + bool is_command_handler = process_is_command_handler(); + + FOR_EACH_SENDER(i) { + if (!senders[i]->shutdown) + continue; + if (!is_command_handler) + PARA_NOTICE_LOG("shutting down %s sender\n", + senders[i]->name); + senders[i]->shutdown(); + } +}