btr: Make it kind of work for http recv + stdout.
authorAndre Noll <maan@systemlinux.org>
Mon, 28 Dec 2009 23:42:47 +0000 (00:42 +0100)
committerAndre Noll <maan@systemlinux.org>
Mon, 28 Dec 2009 23:42:47 +0000 (00:42 +0100)
buffer_tree.c
buffer_tree.h
configure.ac
error.h
http_recv.c
recv.c
stdout.c
stdout.h

index 941d58d..1046460 100644 (file)
@@ -141,14 +141,6 @@ bool btr_inplace_ok(struct btr_node *btrn)
        return list_is_singular(&btrn->parent->children);
 }
 
-struct btr_buffer_reference *btr_next_br(struct btr_node *btrn)
-{
-       if (list_empty(&btrn->input_queue))
-               return NULL;
-       return list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
-}
-
-
 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
 {
        return br->btrb->size - br->consumed;
@@ -160,9 +152,26 @@ size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
        return br_available_bytes(br);
 }
 
-void btr_increase_used_bytes(struct btr_buffer_reference *br, size_t consumed)
+size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
+{
+       struct btr_buffer_reference *br;
+
+       if (list_empty(&btrn->input_queue)) {
+               *bufp = NULL;
+               return 0;
+       }
+       br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
+       return btr_get_buffer_by_reference(br, bufp);
+}
+
+void btr_consume(struct btr_node *btrn, size_t numbytes)
 {
-       br->consumed += consumed;
+       struct btr_buffer_reference *br;
+
+       assert(!list_empty(&btrn->input_queue));
+       br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
+       assert(br->consumed + numbytes <= br->btrb->size);
+       br->consumed += numbytes;
        if (br->consumed == br->btrb->size)
                btr_drop_buffer_reference(br);
 }
@@ -178,6 +187,9 @@ void btr_del_node(struct btr_node *btrn)
 {
        struct btr_node *ch;
 
+       if (!btrn)
+               return;
+       PARA_NOTICE_LOG("deleting %s\n", btrn->name);
        FOR_EACH_CHILD(ch, btrn)
                ch->parent = NULL;
        flush_input_queue(btrn);
index b066ad9..d340973 100644 (file)
@@ -6,3 +6,7 @@ void btr_del_node(struct btr_node *btrn);
 void btr_add_output(char *buf, size_t size, struct btr_node *btrn);
 bool btr_is_leaf_node(struct btr_node *btrn);
 size_t btr_bytes_pending(struct btr_node *btrn);
+size_t btr_get_input_queue_size(struct btr_node *btrn);
+bool btr_no_parent(struct btr_node *btrn);
+size_t btr_next_buffer(struct btr_node *btrn, char **bufp);
+void btr_consume(struct btr_node *btrn, size_t numbytes);
index f7e3f8d..aa62ebc 100644 (file)
@@ -108,7 +108,7 @@ senders=" http dccp udp"
 filter_cmdline_objs="add_cmdline(filter compress_filter amp_filter prebuffer_filter)"
 filter_errlist_objs="filter_common wav_filter compress_filter filter string
        stdin stdout sched fd amp_filter ggo fecdec_filter fec
-       prebuffer_filter time bitstream imdct wma_common wmadec_filter"
+       prebuffer_filter time bitstream imdct wma_common wmadec_filter buffer_tree"
 filter_ldflags="-lm"
 filters=" compress wav amp fecdec wmadec prebuffer"
 
@@ -147,7 +147,7 @@ default_writer="FILE_WRITE"
 
 client_cmdline_objs="add_cmdline(client)"
 client_errlist_objs="client net string crypt fd sched stdin stdout
-       client_common sha1"
+       client_common sha1 buffer_tree"
 client_ldflags=""
 
 gui_cmdline_objs="add_cmdline(gui)"
diff --git a/error.h b/error.h
index 6b5e300..4aec60d 100644 (file)
--- a/error.h
+++ b/error.h
@@ -23,7 +23,6 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define SHA1_ERRORS
 #define RBTREE_ERRORS
 #define RECV_ERRORS
-#define STDOUT_ERRORS
 #define IPC_ERRORS
 #define DCCP_SEND_ERRORS
 #define HTTP_SEND_ERRORS
@@ -39,6 +38,10 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 
 extern const char **para_errlist[];
 
+#define STDOUT_ERRORS \
+       PARA_ERROR(ORPHAN, "orphaned (EOF)"), \
+
+
 #define BITSTREAM_ERRORS \
        PARA_ERROR(VLC, "invalid vlc code"), \
 
index bedd989..abb8c1e 100644 (file)
@@ -108,9 +108,9 @@ static void http_recv_post_select(struct sched *s, struct task *t)
 
        if (rn->output_error && *rn->output_error < 0) {
                t->error = *rn->output_error;
-               return;
+               goto err;
        }
-       if  (phd->status == HTTP_CONNECTED) {
+       if (phd->status == HTTP_CONNECTED) {
                char *rq;
                if (!FD_ISSET(phd->fd, &s->wfds))
                        return;
@@ -118,25 +118,27 @@ static void http_recv_post_select(struct sched *s, struct task *t)
                PARA_INFO_LOG("sending http request\n");
                t->error = send_va_buffer(phd->fd, "%s", rq);
                free(rq);
-               if (t->error >= 0)
-                       phd->status = HTTP_SENT_GET_REQUEST;
+               if (t->error < 0)
+                       goto err;
+               phd->status = HTTP_SENT_GET_REQUEST;
                return;
        }
        if (!FD_ISSET(phd->fd, &s->rfds))
                return;
        if (phd->status == HTTP_SENT_GET_REQUEST) {
                t->error = recv_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG));
-               if (t->error >= 0) {
-                       PARA_INFO_LOG("received ok msg, streaming\n");
-                       phd->status = HTTP_STREAMING;
-               }
+               if (t->error < 0)
+                       goto err;
+               PARA_INFO_LOG("received ok msg, streaming\n");
+               phd->status = HTTP_STREAMING;
                return;
        }
        if (conf->buffer_tree_given) {
                char *buf;
+
                if (btr_bytes_pending(rn->btrn) > HTTP_RECV_MAX_PENDING) {
                        t->error = -E_HTTP_RECV_OVERRUN;
-                       return;
+                       goto err;
                }
                buf = para_malloc(HTTP_RECV_READ_BUF_SIZE);
                t->error = recv_bin_buffer(phd->fd, buf, HTTP_RECV_READ_BUF_SIZE);
@@ -144,26 +146,36 @@ static void http_recv_post_select(struct sched *s, struct task *t)
                        t->error = -E_RECV_EOF;
                if (t->error < 0) {
                        free(buf);
-                       return;
+                       goto err;
                }
                btr_add_output(buf, t->error, rn->btrn);
                return;
        }
        t->error = -E_HTTP_RECV_OVERRUN;
        if (rn->loaded >= BUFSIZE)
-               return;
+               goto err;
        t->error = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
                BUFSIZE - rn->loaded);
        if (t->error == 0)
                t->error = -E_RECV_EOF;
        if (t->error < 0)
-               return;
+               goto err;
        rn->loaded += t->error;
+       return;
+err:
+       if (conf->buffer_tree_given) {
+               btr_del_node(rn->btrn);
+               rn->btrn = NULL;
+       }
 }
 
 static void http_recv_close(struct receiver_node *rn)
 {
        struct private_http_recv_data *phd = rn->private_data;
+       struct http_recv_args_info *conf = rn->conf;
+
+       if (conf->buffer_tree_given)
+               btr_del_node(rn->btrn);
        close(phd->fd);
        free(rn->buf);
        free(rn->private_data);
@@ -198,6 +210,8 @@ static int http_recv_open(struct receiver_node *rn)
        rn->private_data = phd = para_calloc(sizeof(struct private_http_recv_data));
        phd->fd = fd;
        phd->status = HTTP_CONNECTED;
+       if (conf->buffer_tree_given)
+               rn->btrn = btr_new_node("receiver", NULL);
        return 1;
 }
 
diff --git a/recv.c b/recv.c
index 1bb0532..297fbe9 100644 (file)
--- a/recv.c
+++ b/recv.c
@@ -92,12 +92,14 @@ int main(int argc, char *argv[])
        }
        r = &receivers[receiver_num];
        rn.receiver = r;
-       rn.btrn = btr_new_node("receiver", NULL /* no parent */);
        ret = r->open(&rn);
        if (ret < 0)
                goto out;
        r_opened = 1;
 
+       if (conf.buffer_tree_given)
+               sot.btrn = btr_new_node("stdout", rn.btrn);
+
        stdout_set_defaults(&sot);
        sot.bufp = &rn.buf;
        sot.loaded = &rn.loaded;
@@ -111,10 +113,8 @@ int main(int argc, char *argv[])
 
        ret = schedule(&s);
 out:
-       if (r_opened) {
-               btr_del_node(rn.btrn);
+       if (r_opened)
                r->close(&rn);
-       }
        if (r)
                r->shutdown();
        if (ret < 0)
index 4e3b6df..6539b68 100644 (file)
--- a/stdout.c
+++ b/stdout.c
@@ -16,6 +16,7 @@
 #include "fd.h"
 #include "error.h"
 #include "stdout.h"
+#include "buffer_tree.h"
 
 /**
  * The pre_select function of the stdout task.
@@ -44,6 +45,26 @@ static void stdout_pre_select(struct sched *s, struct task *t)
        para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
 }
 
+static void stdout_pre_select_btr(struct sched *s, struct task *t)
+{
+       struct stdout_task *sot = container_of(t, struct stdout_task, task);
+       size_t sz = btr_get_input_queue_size(sot->btrn);
+
+       t->error = 0;
+       sot->check_fd = 0;
+       if (sz == 0) {
+               if (btr_no_parent(sot->btrn)) {
+                       t->error = -E_ORPHAN;
+                       btr_del_node(sot->btrn);
+                       s->timeout.tv_sec = 0;
+                       s->timeout.tv_usec = 1;
+               }
+               return;
+       }
+       sot->check_fd = 1;
+       para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
+}
+
 /**
  * The post select function of the stdout task.
  *
@@ -78,6 +99,37 @@ static void stdout_post_select(struct sched *s, struct task *t)
                memmove(*sot->bufp, *sot->bufp + ret, *sot->loaded);
 }
 
+static void stdout_post_select_btr(struct sched *s, struct task *t)
+{
+       struct stdout_task *sot = container_of(t, struct stdout_task, task);
+       ssize_t ret;
+       size_t sz = btr_get_input_queue_size(sot->btrn);
+       bool orphan = btr_no_parent(sot->btrn);
+       char *buf;
+
+       t->error = 0;
+       if (!sot->check_fd) {
+               if (sz == 0 && orphan) {
+                       t->error = -E_ORPHAN;
+                       goto err;
+               }
+               return;
+       }
+       if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
+               return;
+       sz = btr_next_buffer(sot->btrn, &buf);
+       if (sz == 0)
+               return;
+       ret = write(STDOUT_FILENO, buf, sz);
+       if (ret < 0) {
+               t->error = -ERRNO_TO_PARA_ERROR(errno);
+               goto err;
+       }
+       btr_consume(sot->btrn, ret);
+       return;
+err:
+       btr_del_node(sot->btrn);
+}
 /**
  * Initialize a stdout task structure with default values.
  *
@@ -90,9 +142,13 @@ void stdout_set_defaults(struct stdout_task *sot)
 {
        int ret;
 
-       sot->use_buffer_tree = false;
-       sot->task.pre_select = stdout_pre_select;
-       sot->task.post_select = stdout_post_select;
+       if (sot->btrn) {
+               sot->task.pre_select = stdout_pre_select_btr;
+               sot->task.post_select = stdout_post_select_btr;
+       } else {
+               sot->task.pre_select = stdout_pre_select;
+               sot->task.post_select = stdout_post_select;
+       }
        sprintf(sot->task.status, "stdout writer");
        ret = mark_fd_nonblocking(STDOUT_FILENO);
        if (ret >= 0)
index c3eb311..fee9800 100644 (file)
--- a/stdout.h
+++ b/stdout.h
@@ -20,8 +20,8 @@ struct stdout_task {
        struct task task;
        /** Whether \p STDOUT_FILENO was included in the write fd set. */
        int check_fd;
-       /** Whether to use the buffer tree API. */
-       bool use_buffer_tree;
+       /** Non-null if buffer tree API should be used. */
+       struct btr_node *btrn;
 };
 
 void stdout_set_defaults(struct stdout_task *sot);