From: Andre Noll <maan@systemlinux.org>
Date: Sun, 1 Mar 2009 22:09:51 +0000 (+0100)
Subject: vss.c: Improve the fec timing code.
X-Git-Tag: v0.3.4~57^2
X-Git-Url: https://git.tuebingen.mpg.de/?a=commitdiff_plain;h=2c5362b499b106b10e7be985660701f2c3c6af79;p=paraslash.git

vss.c: Improve the fec timing code.

This patch

	- Fixes a the bug where assert(start_buf) caused para_server
	to abort.
	- Gets rid of buffer underruns by introducing a quick hack that
	sends all but the first slice a bit early.
	- Moves some code from compute_next_fec_slice() to
	setup_next_fec_group() where it belongs.
---

diff --git a/vss.c b/vss.c
index b8c7463b..dc598936 100644
--- a/vss.c
+++ b/vss.c
@@ -179,46 +179,77 @@ struct timeval *vss_chunk_time(void)
 	return &mmd->afd.afhi.chunk_tv;
 }
 
-static void setup_fec_group(struct fec_client *fc, struct vss_task *vsst)
+/**
+ * Write a fec header to a buffer.
+ *
+ * \param buf The buffer to write to.
+ * \param h The fec header to write.
+ */
+static void write_fec_header(struct fec_client *fc)
+{
+	char *buf = (char *)fc->enc_buf;
+
+	write_u32(buf, FEC_MAGIC);
+
+	write_u8(buf + 4, fc->fcp->slices_per_group);
+	write_u8(buf + 5, fc->fcp->data_slices_per_group);
+	write_u32(buf + 6, (uint32_t)0); /* audio header len */
+
+	write_u32(buf + 10, fc->group.num);
+	write_u32(buf + 14, fc->group.bytes);
+
+	write_u8(buf + 18, fc->slice.num);
+	write_u16(buf + 20, fc->slice.bytes);
+	memset(buf + 22, 0, 11);
+}
+
+static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
 {
-	uint32_t num_bytes = 0, chunk_num, max_group_size, last_payload_size;
+	uint32_t max_group_size, last_payload_size;
 	int i, k = fc->fcp->data_slices_per_group;
-	const char *start_buf = NULL;
+	size_t len;
+	const char *start_buf;
 	struct timeval tmp, *chunk_tv = vss_chunk_time();
 
 	assert(chunk_tv);
+	if (fc->first_stream_chunk < 0) {
+		fc->stream_start = *now;
+		fc->first_stream_chunk = mmd->current_chunk;
+		fc->group.first_chunk = mmd->current_chunk;
+		fc->group.num = 0;
+	} else {
+		fc->group.first_chunk += fc->group.num_chunks;
+		fc->group.num++;
+	}
+	if (fc->group.first_chunk >= mmd->afd.afhi.chunks_total)
+		return 0;
 	max_group_size = (fc->fcp->max_slice_bytes - FEC_HEADER_SIZE) * k;
-	chunk_num = fc->group.first_chunk;
-	for (;;) {
+	afh_get_chunk(fc->group.first_chunk, &mmd->afd.afhi, vsst->map,
+		&start_buf, &len);
+	for (i = fc->group.first_chunk, fc->group.bytes = 0;
+			i < mmd->afd.afhi.chunks_total; i++) {
 		const char *buf;
-		size_t len;
 
-		if (chunk_num >= mmd->afd.afhi.chunks_total)
+		afh_get_chunk(i, &mmd->afd.afhi, vsst->map, &buf, &len);
+		if (fc->group.bytes + len > max_group_size)
 			break;
-		afh_get_chunk(chunk_num, &mmd->afd.afhi, vsst->map, &buf, &len);
-		if (!start_buf)
-			start_buf = buf;
-		if (num_bytes + len > max_group_size)
-			break;
-		chunk_num++;
-		num_bytes += len;
+		fc->group.bytes += len;
 	}
-	assert(start_buf);
-	fc->group.num_chunks = chunk_num - fc->group.first_chunk;
-	fc->group.num++;
-	fc->group.bytes = num_bytes;
+	fc->group.num_chunks = i - fc->group.first_chunk;
 	fc->slice.num = 0;
-	fc->slice.bytes = ROUND_UP(num_bytes, k) / k;
+	fc->slice.bytes = ROUND_UP(fc->group.bytes, k) / k;
 
 	/* The last slice will not be fully used */
-	last_payload_size = num_bytes % fc->slice.bytes;
+	last_payload_size = fc->group.bytes % fc->slice.bytes;
 	if (!last_payload_size)
 		last_payload_size = fc->slice.bytes;
 
-	tv_scale(fc->group.num_chunks, chunk_tv, &fc->group.duration);
 	tv_scale(fc->group.first_chunk - fc->first_stream_chunk, chunk_tv,
 		&tmp);
 	tv_add(&fc->stream_start, &tmp, &fc->group.start);
+	if (fc->group.num) /* quick hack to avoid buffer underruns */
+		fc->group.start.tv_sec--;
+	tv_scale(fc->group.num_chunks, chunk_tv, &fc->group.duration);
 	tv_divide(fc->fcp->slices_per_group, &fc->group.duration,
 		&fc->group.slice_duration);
 
@@ -228,39 +259,38 @@ static void setup_fec_group(struct fec_client *fc, struct vss_task *vsst)
 
 	if (start_buf + k * fc->slice.bytes > vsst->map + mmd->size) {
 		/* can not use last slice as it goes beyond the map */
-		if (fc->extra_src_buf_size < fc->slice.bytes)
+		if (fc->extra_src_buf_size < fc->slice.bytes) {
 			fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->slice.bytes);
+			fc->extra_src_buf_size = fc->slice.bytes;
+		}
 		memcpy(fc->extra_src_buf, start_buf + (k - 1) * fc->slice.bytes,
 			last_payload_size);
 		memset(fc->extra_src_buf + last_payload_size, 0,
 			fc->slice.bytes - last_payload_size);
 		fc->src_data[k - 1] = fc->extra_src_buf;
 	}
-
+	if (fc->enc_buf_size < fc->slice.bytes + FEC_HEADER_SIZE) {
+		fc->enc_buf_size = fc->slice.bytes + FEC_HEADER_SIZE;
+		fc->enc_buf = para_realloc(fc->enc_buf, fc->enc_buf_size);
+	}
+	PARA_INFO_LOG("FEC group %d: %d chunks (%d - %d), duration: %lums\n",
+		fc->group.num, fc->group.num_chunks, fc->group.first_chunk,
+		fc->group.first_chunk + fc->group.num_chunks - 1,
+		tv2ms(&fc->group.duration));
+	return 1;
 }
 
-/**
- * Write a fec header to a buffer.
- *
- * \param buf The buffer to write to.
- * \param h The fec header to write.
- */
-static void write_fec_header(struct fec_client *fc)
+static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
 {
-	char *buf = (char *)fc->enc_buf;
-
-	write_u32(buf, FEC_MAGIC);
-
-	write_u8(buf + 4, fc->fcp->slices_per_group);
-	write_u8(buf + 5, fc->fcp->data_slices_per_group);
-	write_u32(buf + 6, (uint32_t)0); /* audio header len */
-
-	write_u32(buf + 10, fc->group.num);
-	write_u32(buf + 14, fc->group.bytes);
-
-	write_u8(buf + 18, fc->slice.num);
-	write_u16(buf + 20, fc->slice.bytes);
-	memset(buf + 22, 0, 11);
+	if (fc->first_stream_chunk < 0 || fc->slice.num
+			== fc->fcp->slices_per_group) {
+		if (!setup_next_fec_group(fc, vsst))
+			return 0;
+	}
+	write_fec_header(fc);
+	fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE,
+		fc->slice.num, fc->slice.bytes);
+	return 1;
 }
 
 /**
@@ -283,28 +313,6 @@ size_t vss_get_fec_eof_packet(const char **buf)
 	return FEC_HEADER_SIZE;
 }
 
-static void compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
-{
-	if (fc->first_stream_chunk < 0) {
-		fc->stream_start = *now;
-		fc->first_stream_chunk = mmd->current_chunk;
-		fc->group.first_chunk = mmd->current_chunk;
-		fc->group.num = 0;
-		setup_fec_group(fc, vsst);
-	} else if (fc->slice.num == fc->fcp->slices_per_group) {
-		fc->group.first_chunk += fc->group.num_chunks;
-		setup_fec_group(fc, vsst);
-
-	}
-	if (fc->enc_buf_size < fc->slice.bytes + FEC_HEADER_SIZE) {
-		fc->enc_buf_size = fc->slice.bytes + FEC_HEADER_SIZE;
-		fc->enc_buf = para_realloc(fc->enc_buf, fc->enc_buf_size);
-	}
-	write_fec_header(fc);
-	fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE,
-		fc->slice.num, fc->slice.bytes);
-}
-
 /**
  * Add one entry to the list of active fec clients.
  *
@@ -761,7 +769,8 @@ static void vss_send(struct vss_task *vsst)
 	list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
 		if (!next_slice_is_due(fc, NULL))
 			continue;
-		compute_next_fec_slice(fc, vsst);
+		if (!compute_next_fec_slice(fc, vsst))
+			continue;
 		PARA_DEBUG_LOG("sending %d:%d (%zu bytes)\n", fc->group.num,
 			fc->slice.num, fc->slice.bytes);
 		fc->fcp->send((char *)fc->enc_buf,