From: Andre Noll <maan@systemlinux.org>
Date: Sun, 12 Aug 2007 16:36:53 +0000 (+0200)
Subject: Abstract out chunk_queue code.
X-Git-Tag: v0.2.17~31
X-Git-Url: https://git.tuebingen.mpg.de/?a=commitdiff_plain;h=c83577be9792e7872c70ba8f4d610278f01b8b11;p=paraslash.git

Abstract out chunk_queue code.

This moves the chunk handling code to separate functions cq_foo().  They
take pointers to a chunk queue, which makes them orthogonal to the rest of
http_send.c.
---

diff --git a/http_send.c b/http_send.c
index 2597066a..65b6c5a2 100644
--- a/http_send.c
+++ b/http_send.c
@@ -106,18 +106,96 @@ struct access_info {
 static int server_fd = -1, numclients;
 static struct sender *self;
 
-static void http_shutdown_client(struct http_client *hc, const char *msg)
+
+static int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
+	size_t sent)
+{
+	struct queued_chunk *qc;
+	char *buf;
+	size_t len;
+	int ret;
+
+	if (chunk_num != -1U) {
+		ret = vss_get_chunk(chunk_num, &buf, &len);
+		if (ret < 0)
+			return ret;
+	} else
+		buf = vss_get_header(&len);
+	if (cq->num_pending + len > cq->max_pending)
+		return -E_QUEUE;
+	qc = para_malloc(sizeof(struct queued_chunk));
+	cq->num_pending += len;
+	qc->chunk_num = chunk_num;
+	qc->sent = sent;
+	list_add_tail(&qc->node, &cq->q);
+	PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
+	return 1;
+}
+
+static struct queued_chunk *cq_peek(struct chunk_queue *cq)
+{
+	if (list_empty(&cq->q))
+		return NULL;
+	return list_entry(cq->q.next, struct queued_chunk, node);
+}
+
+int cq_dequeue(struct chunk_queue *cq)
+{
+	struct queued_chunk *qc = cq_peek(cq);
+	assert(qc);
+	list_del(&qc->node);
+	free(qc);
+	return 1;
+}
+
+void cq_update(struct chunk_queue *cq, size_t sent)
+{
+	struct queued_chunk *qc = cq_peek(cq);
+	assert(qc);
+	qc->sent += sent;
+	cq->num_pending -= sent;
+}
+
+int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
+{
+	int ret;
+
+	if (qc->chunk_num != -1U) {
+		ret = vss_get_chunk(qc->chunk_num, buf, len);
+		if (ret < 0)
+			return ret;
+	} else
+		*buf = vss_get_header(len);
+	assert(*len > qc->sent);
+	*buf += qc->sent;
+	*len -= qc->sent;
+	return 1;
+}
+
+void cq_init(struct chunk_queue *cq, size_t max_pending)
+{
+	INIT_LIST_HEAD(&cq->q);
+	cq->max_pending = max_pending;
+	cq->num_pending = 0;
+}
+
+void cq_destroy(struct chunk_queue *cq)
 {
 	struct queued_chunk *qc, *tmp;
+	list_for_each_entry_safe(qc, tmp, &cq->q, node) {
+		list_del(&qc->node);
+		free(qc);
+	}
+}
+
+static void http_shutdown_client(struct http_client *hc, const char *msg)
+{
 	PARA_INFO_LOG("shutting down %s on fd %d (%s)\n", CLIENT_ADDR(hc),
 		hc->fd, msg);
 	numclients--;
 	close(hc->fd);
 	del_close_on_fork_list(hc->fd);
-	list_for_each_entry_safe(qc, tmp, &hc->cq.q, node) {
-		list_del(&qc->node);
-		free(qc);
-	}
+	cq_destroy(&hc->cq);
 	list_del(&hc->node);
 	free(hc);
 }
@@ -150,61 +228,24 @@ static int http_send_err_msg(struct http_client *hc)
 	return http_send_msg(hc, HTTP_ERR_MSG);
 }
 
-static int enqueue_chunk(struct chunk_queue *cq, long unsigned chunk_num,
-	size_t sent)
-{
-	struct queued_chunk *qc;
-	char *buf;
-	size_t len;
-	int ret;
-
-	if (chunk_num != -1U) {
-		ret = vss_get_chunk(chunk_num, &buf, &len);
-		if (ret < 0)
-			return ret;
-	} else
-		buf = vss_get_header(&len);
-	if (cq->num_pending + len > MAX_BACKLOG)
-		return -E_QUEUE;
-	qc = para_malloc(sizeof(struct queued_chunk));
-	cq->num_pending += len;
-	qc->chunk_num = chunk_num;
-	qc->sent = sent;
-	list_add_tail(&qc->node, &cq->q);
-	PARA_INFO_LOG("%lu bytes queued for q %p\n", cq->num_pending, &cq->q);
-	return 1;
-}
 
 static int send_queued_chunks(struct http_client *hc)
 {
-	int ret;
-	struct queued_chunk *qc, *tmp;
-
-	if (list_empty(&hc->cq.q))
-		return 1;
-	list_for_each_entry_safe(qc, tmp, &hc->cq.q, node) {
+	struct queued_chunk *qc;
+	while ((qc = cq_peek(&hc->cq))) {
 		char *buf;
 		size_t len;
-		ret = write_ok(hc->fd);
+		int ret = write_ok(hc->fd);
 		if (ret <= 0)
 			return ret? -E_WRITE_OK : 0;
-		if (qc->chunk_num != -1U) {
-			ret = vss_get_chunk(qc->chunk_num, &buf, &len);
-			if (ret < 0)
-				return ret;
-		} else
-			buf = vss_get_header(&len);
-		assert(len && len > qc->sent);
-		ret = write(hc->fd, buf + qc->sent, len - qc->sent);
+		cq_get(qc, &buf, &len);
+		ret = write(hc->fd, buf, len);
 		if (ret < 0)
 			return -1; /* FIXME */
-		hc->cq.num_pending -= ret;
-		if (ret != len - qc->sent) {
-			qc->sent += ret;
-			return 0;
-		}
-		list_del(&qc->node);
-		free(qc);
+		cq_update(&hc->cq, ret);
+		if (ret != len)
+			return 1;
+		cq_dequeue(&hc->cq);
 	}
 	return 1;
 }
@@ -212,7 +253,7 @@ static int send_queued_chunks(struct http_client *hc)
 static int queue_chunk_or_shutdown(struct http_client *hc, long unsigned chunk_num,
 	size_t sent)
 {
-	int ret = enqueue_chunk(&hc->cq, chunk_num, sent);
+	int ret = cq_enqueue(&hc->cq, chunk_num, sent);
 	if (ret < 0)
 		http_shutdown_client(hc, "queue error");
 	return ret;
@@ -248,9 +289,6 @@ static void http_send( long unsigned current_chunk,
 		if (!len)
 			continue;
 		if (!ret || write_ok(hc->fd) <= 0) {
-			PARA_INFO_LOG("fd %d not ready (%lu bytes queued),"
-				" trying to queue chunk\n", hc->fd,
-				hc->cq.num_pending);
 			queue_chunk_or_shutdown(hc, current_chunk, 0);
 			continue;
 		}
@@ -342,7 +380,7 @@ static void http_post_select(fd_set *rfds, fd_set *wfds)
 		goto err_out;
 	}
 	hc->status = HTTP_CONNECTED;
-	INIT_LIST_HEAD(&hc->cq.q);
+	cq_init(&hc->cq, MAX_BACKLOG);
 	PARA_INFO_LOG("accepted client #%d: %s (fd %d)\n", numclients,
 		CLIENT_ADDR(hc), hc->fd);
 	numclients++;