/** \file http_send.c paraslash's http sender */
+#include <sys/types.h>
+#include <dirent.h>
+#include "para.h"
#include "server.cmdline.h"
+#include "afh.h"
#include "server.h"
#include "http.h"
#include "vss.h"
#include "net.h"
#include "string.h"
#include "fd.h"
+#include "chunk_queue.h"
/** \cond convert sock_addr_in to ascii */
#define CLIENT_ADDR(hc) inet_ntoa((hc)->addr.sin_addr)
};
/** Clients will be kicked if there are more than that many bytes pending. */
-#define MAX_BACKLOG 40000
+#define MAX_BACKLOG 400000
/** The list of connected clients. */
static struct list_head clients;
/** The whitelist/blacklist. */
int check_w;
/** The position of this client in the client list. */
struct list_head node;
- /** Te list of pending packets for this client. */
- struct list_head packet_queue;
- /** The number of pending bytes for this client. */
- unsigned long pq_bytes;
-};
-
-/**
- * Describes one queued data packet for a client.
- *
- * The send function of the http sender checks each client fd for writing. If a
- * client fd is not ready, it tries to queue that packet for this client until
- * the number of queued bytes exceeds \p MAX_BACKLOG.
- */
-struct queued_packet {
- /** The length of the packet in bytes. */
- unsigned int len;
- /** Pointer to the packet data. */
- char *packet;
- /** Position of the packet in the packet list. */
- struct list_head node;
+ /** The list of pending chunks for this client. */
+ struct chunk_queue *cq;
};
/**
static int server_fd = -1, numclients;
static struct sender *self;
+
static void http_shutdown_client(struct http_client *hc, const char *msg)
{
- struct queued_packet *qp, *tmp;
PARA_INFO_LOG("shutting down %s on fd %d (%s)\n", CLIENT_ADDR(hc),
hc->fd, msg);
numclients--;
close(hc->fd);
del_close_on_fork_list(hc->fd);
- list_for_each_entry_safe(qp, tmp, &hc->packet_queue, node) {
- free(qp->packet);
- list_del(&qp->node);
- free(qp);
- }
+ cq_destroy(hc->cq);
list_del(&hc->node);
free(hc);
}
return http_send_msg(hc, HTTP_ERR_MSG);
}
-static int queue_packet(struct http_client *hc, const char *buf, size_t len)
-{
- struct queued_packet *qp;
- if (hc->pq_bytes + len > MAX_BACKLOG) {
- http_shutdown_client(hc, "packet queue overrun");
- return -E_QUEUE;
- }
- qp = para_malloc(sizeof(struct queued_packet));
- hc->pq_bytes += len;
- qp->packet = para_malloc(len);
- memcpy(qp->packet, buf, len);
- qp->len = len;
- list_add_tail(&qp->node, &hc->packet_queue);
- PARA_INFO_LOG("%lu bytes queued for fd %d\n", hc->pq_bytes, hc->fd);
- return 1;
-}
-
-static int send_queued_packets(struct http_client *hc)
+static int send_queued_chunks(struct http_client *hc)
{
- int ret;
- struct queued_packet *qp, *tmp;
-
- if (list_empty(&hc->packet_queue))
- return 1;
- list_for_each_entry_safe(qp, tmp, &hc->packet_queue, node) {
- ret = write_ok(hc->fd);
+ struct queued_chunk *qc;
+ while ((qc = cq_peek(hc->cq))) {
+ char *buf;
+ size_t len;
+ int ret = write_ok(hc->fd);
if (ret <= 0)
return ret? -E_WRITE_OK : 0;
- ret = write(hc->fd, qp->packet, qp->len);
+ cq_get(qc, &buf, &len);
+ ret = write(hc->fd, buf, len);
if (ret < 0)
- return ret;
- if (ret != qp->len) {
- qp->len -= ret;
- memmove(qp->packet, qp->packet + ret, qp->len);
- return 0;
- }
- hc->pq_bytes -= qp->len;
- free(qp->packet);
- list_del(&qp->node);
- free(qp);
+ return -E_SEND_QUEUED_CHUNK;
+ cq_update(hc->cq, ret);
+ if (ret != len)
+ return 1;
+ cq_dequeue(hc->cq);
}
return 1;
}
+static int queue_chunk_or_shutdown(struct http_client *hc, long unsigned chunk_num,
+ size_t sent)
+{
+ int ret = cq_enqueue(hc->cq, chunk_num, sent);
+ if (ret < 0)
+ http_shutdown_client(hc, "queue error");
+ return ret;
+}
+
static void http_send( long unsigned current_chunk,
__a_unused long unsigned chunks_sent, const char *buf, size_t len)
{
hc->status != HTTP_READY_TO_STREAM)
continue;
if (hc->status == HTTP_READY_TO_STREAM) {
- unsigned hlen;
+ size_t hlen;
char *hbuf = vss_get_header(&hlen);
if (hbuf && hlen > 0 && current_chunk) {
/* need to send header */
- PARA_INFO_LOG("queueing header: %d\n", hlen);
- if (queue_packet(hc, hbuf, hlen) < 0)
+ PARA_INFO_LOG("queueing header: %zu\n", hlen);
+ if (queue_chunk_or_shutdown(hc, -1U, 0) < 0)
continue;
} else
- PARA_INFO_LOG("%s", "no need to queue header\n");
+ PARA_INFO_LOG("no need to queue header\n");
hc->status = HTTP_STREAMING;
}
- ret = send_queued_packets(hc);
+ ret = send_queued_chunks(hc);
if (ret < 0) {
- http_shutdown_client(hc, "send error");
+ http_shutdown_client(hc, "queue send error");
continue;
}
if (!len)
continue;
if (!ret || write_ok(hc->fd) <= 0) {
- PARA_INFO_LOG("fd %d not ready (%lu bytes queued),"
- " trying to queue packet\n", hc->fd,
- hc->pq_bytes);
- queue_packet(hc, buf, len);
+ queue_chunk_or_shutdown(hc, current_chunk, 0);
continue;
}
// PARA_DEBUG_LOG("sending %d -> %s\n", len, CLIENT_ADDR(hc));
ret = write(hc->fd, buf, len);
+// PARA_DEBUG_LOG("ret: %d\n", ret);
if (ret < 0) {
http_shutdown_client(hc, "send error");
continue;
}
if (ret != len)
- queue_packet(hc, buf + ret, len - ret);
+ queue_chunk_or_shutdown(hc, current_chunk, ret);
}
}
goto err_out;
}
hc->status = HTTP_CONNECTED;
- INIT_LIST_HEAD(&hc->packet_queue);
+ hc->cq = cq_new(MAX_BACKLOG);
PARA_INFO_LOG("accepted client #%d: %s (fd %d)\n", numclients,
CLIENT_ADDR(hc), hc->fd);
numclients++;