]> git.tuebingen.mpg.de Git - paraslash.git/commitdiff
[btr] Add the node type parameter to btr_node_status().
authorAndre Noll <maan@systemlinux.org>
Tue, 5 Jan 2010 18:04:04 +0000 (19:04 +0100)
committerAndre Noll <maan@systemlinux.org>
Tue, 5 Jan 2010 18:04:04 +0000 (19:04 +0100)
This allows to have a single function which can be called from
both the pre_select and the post_select methods of all receivers/
filters/writers and from the stdin and stdout tasks.

16 files changed:
aacdec_filter.c
alsa_write.c
amp_filter.c
buffer_tree.c
buffer_tree.h
compress_filter.c
error.h
file_write.c
filter_common.c
http_recv.c
mp3dec_filter.c
oggdec_filter.c
oss_write.c
stdin.c
stdout.c
wmadec_filter.c

index 602df63ef7bd91520f426cbe35ea85cc7ae91fe0..e0ca6e226f16bb0d788720ba7affcdd5691978ba 100644 (file)
@@ -220,7 +220,7 @@ static void aacdec_post_select(__a_unused struct sched *s, struct task *t)
 
 next_buffer:
        t->error = 0;
-       ret = btr_node_status(btrn, fn->min_iqs);
+       ret = btr_node_status(btrn, fn->min_iqs, BTR_NT_INTERNAL);
        if (ret < 0)
                goto err;
        if (ret == 0)
index 016d8ab3c32547e2ab6fcc46902860db9a713271..77621979783955dd09a48464cee6e9d7dceafbbb 100644 (file)
@@ -172,7 +172,7 @@ static int alsa_write_pre_select(struct sched *s, struct writer_node *wn)
        if (!pad->handle)
                return 1;
        if (wn->btrn) {
-               int ret = btr_node_status(wn->btrn, wn->min_iqs);
+               int ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
                if (ret == 0)
                        return 1;
                if (ret < 0) {
@@ -307,7 +307,7 @@ static void alsa_write_post_select_btr(__a_unused struct sched *s,
 
 again:
        t->error = 0;
-       ret = btr_node_status(btrn, wn->min_iqs);
+       ret = btr_node_status(btrn, wn->min_iqs, BTR_NT_LEAF);
        if (ret == 0)
                return;
        btr_merge(btrn, wn->min_iqs);
index 3154735e1753c87872963be7f5f7d83c47640655..d0a893f47518d8c8c33f957ad9b36ab6b80a38b1 100644 (file)
@@ -116,7 +116,7 @@ static void amp_post_select(__a_unused struct sched *s, struct task *t)
                return;
        }
 next_buffer:
-       ret = btr_node_status(btrn, fn->min_iqs);
+       ret = btr_node_status(btrn, fn->min_iqs, BTR_NT_INTERNAL);
        if (ret < 0)
                goto err;
        if (ret == 0)
index 022a4118a76d0191d197b5cda6bc64b77eff924c..23599b834a26e31adda87ecdb49da7238b0d1d10 100644 (file)
@@ -412,18 +412,25 @@ void btr_log_tree(struct btr_node *btrn, int loglevel)
 /** 640K ought to be enough for everybody ;) */
 #define BTRN_MAX_PENDING (640 * 1024)
 
-int btr_node_status(struct btr_node *btrn, size_t min_iqs)
+int btr_node_status(struct btr_node *btrn, size_t min_iqs,
+       enum btr_node_type type)
 {
        size_t iqs;
 
-       if (btr_eof(btrn))
-               return -E_BTR_EOF;
-       if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
-               return 0;
-       iqs = btr_get_input_queue_size(btrn);
-       if (iqs == 0) /* we have a parent, because not eof */
-               return 0;
-       if (iqs < min_iqs && !btr_no_parent(btrn))
-               return 0;
+       if (type != BTR_NT_LEAF) {
+               if (btr_no_children(btrn))
+                       return -E_BTR_NO_CHILD;
+               if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
+                       return 0;
+       }
+       if (type != BTR_NT_ROOT) {
+               if (btr_eof(btrn))
+                       return -E_BTR_EOF;
+               iqs = btr_get_input_queue_size(btrn);
+               if (iqs == 0) /* we have a parent, because not eof */
+                       return 0;
+               if (iqs < min_iqs && !btr_no_parent(btrn))
+                       return 0;
+       }
        return 1;
 }
index d02eefc673b73ad8ca7c65c35d77e6e897588384..903996860c56750d23fd53b629ed58fd03e58296 100644 (file)
@@ -4,6 +4,12 @@ struct btr_node;
 typedef int (*btr_command_handler)(struct btr_node *btrn,
                const char *command, char **result);
 
+enum btr_node_type {
+       BTR_NT_ROOT,
+       BTR_NT_INTERNAL,
+       BTR_NT_LEAF,
+};
+
 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
                btr_command_handler handler, void *context);
 void btr_remove_node(struct btr_node *btrn);
@@ -25,4 +31,5 @@ bool btr_eof(struct btr_node *btrn);
 void btr_log_tree(struct btr_node *btrn, int loglevel);
 int btr_pushdown_one(struct btr_node *btrn);
 bool btr_inplace_ok(struct btr_node *btrn);
-int btr_node_status(struct btr_node *btrn, size_t min_iqs);
+int btr_node_status(struct btr_node *btrn, size_t min_iqs,
+               enum btr_node_type type);
index 7c392746c5c1e02bc03408876e8af65f1a8118da..14df8d966128556e32957749b91bdd225730b1c1 100644 (file)
@@ -103,7 +103,7 @@ static void compress_post_select(__a_unused struct sched *s, struct task *t)
        //inplace = false;
 next_buffer:
        t->error = 0;
-       ret = btr_node_status(btrn, fn->min_iqs);
+       ret = btr_node_status(btrn, fn->min_iqs, BTR_NT_INTERNAL);
        if (ret < 0)
                goto err;
        if (ret == 0)
diff --git a/error.h b/error.h
index 237fbc23b8efbef763577215330db681898cff7b..538c757ae2f5bdc53ca3d6b13cfb7845998d36d1 100644 (file)
--- a/error.h
+++ b/error.h
@@ -39,6 +39,7 @@ extern const char **para_errlist[];
 
 #define BUFFER_TREE_ERRORS \
        PARA_ERROR(BTR_EOF, "buffer tree: end of file"), \
+       PARA_ERROR(BTR_NO_CHILD, "btr node has no children"), \
 
 #define STDOUT_ERRORS \
        PARA_ERROR(STDOUT_EOF, "stdout: end of file"), \
@@ -225,7 +226,6 @@ extern const char **para_errlist[];
 
 #define STDIN_ERRORS \
        PARA_ERROR(STDIN_EOF, "end of file"), \
-       PARA_ERROR(STDIN_NO_CHILD, "stdin btr node has no children"), \
 
 
 
@@ -248,7 +248,6 @@ extern const char **para_errlist[];
 
 #define HTTP_RECV_ERRORS \
        PARA_ERROR(HTTP_RECV_OVERRUN, "http_recv: output buffer overrun"), \
-       PARA_ERROR(HTTP_RECV_NO_CHILD, "http_recv btr node has no children"), \
 
 
 #define RECV_COMMON_ERRORS \
index 8baaa8420a4aea56b0b315956fda7f5d9e0c7698..4c043864620c49ea3ee307f8c788f8e46a923969 100644 (file)
@@ -96,7 +96,7 @@ static void file_write_pre_select_btr(struct sched *s, struct task *t)
 
        t->error = 0;
        pfwd->check_fd = 0;
-       ret = btr_node_status(wn->btrn, wn->min_iqs);
+       ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
        if (ret > 0) {
                para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
                pfwd->check_fd = 1;
@@ -147,7 +147,7 @@ static void file_write_post_select_btr(__a_unused struct sched *s,
        size_t bytes;
 
        t->error = 0;
-       ret = btr_node_status(btrn, wn->min_iqs);
+       ret = btr_node_status(btrn, wn->min_iqs, BTR_NT_LEAF);
        if (ret == 0)
                return;
        if (ret < 0)
index acd84d5cd6332e3d4a2c5507ecc661f1822ba0e1..a4f8868b6355a66d92eecd5bc4ebb10f282c27f1 100644 (file)
@@ -281,7 +281,7 @@ void generic_filter_pre_select(struct sched *s, struct task *t)
        struct filter_node *fn = container_of(t, struct filter_node, task);
 
        t->error = 0;
-       if (btr_node_status(fn->btrn, fn->min_iqs) != 0) {
+       if (btr_node_status(fn->btrn, fn->min_iqs, BTR_NT_INTERNAL) != 0) {
                s->timeout.tv_sec = 0;
                s->timeout.tv_usec = 1;
        }
index 8f4ca2b57b46362119f81277f09720de2e4c38fe..20e00033abf01ae7013e59117f1a10bafa592f05 100644 (file)
@@ -97,18 +97,27 @@ static void http_recv_pre_select(struct sched *s, struct task *t)
                para_fd_set(phd->fd, &s->rfds, &s->max_fileno);
 }
 
-#define HTTP_RECV_MAX_PENDING (1024 * 1024)
 #define HTTP_RECV_READ_BUF_SIZE 4096
 
 static void http_recv_post_select(struct sched *s, struct task *t)
 {
        struct receiver_node *rn = container_of(t, struct receiver_node, task);
        struct private_http_recv_data *phd = rn->private_data;
-       struct http_recv_args_info *conf = rn->conf;
+       struct btr_node *btrn = rn->btrn;
+       int ret;
 
-       if (rn->output_error && *rn->output_error < 0) {
-               t->error = *rn->output_error;
-               goto err;
+       t->error = 0;
+       if (btrn) {
+               ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
+               if (ret < 0)
+                       goto err;
+               if (ret == 0)
+                       return;
+       } else {
+               if (rn->output_error && *rn->output_error < 0) {
+                       ret = *rn->output_error;
+                       goto err;
+               }
        }
        if (phd->status == HTTP_CONNECTED) {
                char *rq;
@@ -116,9 +125,9 @@ static void http_recv_post_select(struct sched *s, struct task *t)
                        return;
                rq = make_request_msg();
                PARA_INFO_LOG("sending http request\n");
-               t->error = send_va_buffer(phd->fd, "%s", rq);
+               ret = send_va_buffer(phd->fd, "%s", rq);
                free(rq);
-               if (t->error < 0)
+               if (ret < 0)
                        goto err;
                phd->status = HTTP_SENT_GET_REQUEST;
                return;
@@ -126,51 +135,42 @@ static void http_recv_post_select(struct sched *s, struct task *t)
        if (!FD_ISSET(phd->fd, &s->rfds))
                return;
        if (phd->status == HTTP_SENT_GET_REQUEST) {
-               t->error = recv_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG));
-               if (t->error < 0)
+               ret = recv_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG));
+               if (ret < 0)
                        goto err;
                PARA_INFO_LOG("received ok msg, streaming\n");
                phd->status = HTTP_STREAMING;
                return;
        }
-       if (conf->buffer_tree_given) {
+       if (btrn) {
                char *buf;
 
-               if (btr_no_children(rn->btrn)) {
-                       t->error = -E_HTTP_RECV_NO_CHILD;
-                       goto err;
-               }
-               if (btr_bytes_pending(rn->btrn) > HTTP_RECV_MAX_PENDING) {
-                       t->error = -E_HTTP_RECV_OVERRUN;
-                       goto err;
-               }
                buf = para_malloc(HTTP_RECV_READ_BUF_SIZE);
-               t->error = recv_bin_buffer(phd->fd, buf, HTTP_RECV_READ_BUF_SIZE);
-               if (t->error == 0)
-                       t->error = -E_RECV_EOF;
-               if (t->error < 0) {
+               ret = recv_bin_buffer(phd->fd, buf, HTTP_RECV_READ_BUF_SIZE);
+               if (ret == 0)
+                       ret = -E_RECV_EOF;
+               if (ret < 0) {
                        free(buf);
                        goto err;
                }
-               btr_add_output(buf, t->error, rn->btrn);
+               btr_add_output(buf, ret, btrn);
                return;
        }
-       t->error = -E_HTTP_RECV_OVERRUN;
+       ret = -E_HTTP_RECV_OVERRUN;
        if (rn->loaded >= BUFSIZE)
                goto err;
-       t->error = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
+       ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
                BUFSIZE - rn->loaded);
-       if (t->error == 0)
-               t->error = -E_RECV_EOF;
-       if (t->error < 0)
+       if (ret == 0)
+               ret = -E_RECV_EOF;
+       if (ret < 0)
                goto err;
-       rn->loaded += t->error;
+       rn->loaded += ret;
        return;
 err:
-       if (conf->buffer_tree_given) {
+       if (btrn)
                btr_remove_node(rn->btrn);
-               rn->btrn = NULL;
-       }
+       t->error = ret;
 }
 
 static void http_recv_close(struct receiver_node *rn)
index 2383fba669c58ef743c40f12997f7b734d4d1451..397a2420c33e2a7bbeadb2ee641e933b79bb9b8a 100644 (file)
@@ -199,7 +199,7 @@ next_buffer:
        iqs = btr_get_input_queue_size(btrn);
        if (need_bad_data_delay(pmd, iqs))
                return;
-       ret = btr_node_status(btrn, fn->min_iqs);
+       ret = btr_node_status(btrn, fn->min_iqs, BTR_NT_INTERNAL);
        if (ret < 0)
                goto err;
        if (ret == 0)
index 956af85223a1e101fd9b290d84487fc74acae46a..4b6232b97840f5a66fe5e7ed07f8c8bb79b4a898 100644 (file)
@@ -191,7 +191,7 @@ static void ogg_post_select(__a_unused struct sched *s, struct task *t)
        char *in;
 
        t->error = 0;
-       ret = btr_node_status(btrn, fn->min_iqs);
+       ret = btr_node_status(btrn, fn->min_iqs, BTR_NT_INTERNAL);
        if (ret < 0)
                goto err;
        if (ret == 0)
index f8feb43a03e90ebba8f1bed2a1f5f0c292501d81..20b05009d38b08c10b06b64ebf44777a9d5a803d 100644 (file)
@@ -61,7 +61,7 @@ static void oss_pre_select_btr(struct sched *s, struct task *t)
 {
        struct writer_node *wn = container_of(t, struct writer_node, task);
        struct private_oss_write_data *powd = wn->private_data;
-       int ret = btr_node_status(wn->btrn, wn->min_iqs);
+       int ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF);
 
        t->error = 0;
        if (ret < 0)
@@ -209,7 +209,7 @@ static void oss_post_select_btr(__a_unused struct sched *s,
        struct private_oss_write_data *powd = wn->private_data;
        struct btr_node *btrn = wn->btrn;
        size_t frames, bytes;
-       int ret = btr_node_status(btrn, wn->min_iqs);
+       int ret = btr_node_status(btrn, wn->min_iqs, BTR_NT_LEAF);
        char *data;
 
        if (ret < 0)
diff --git a/stdin.c b/stdin.c
index 079865f7ac982bbcdf8103a5418e58a0d05e25b2..8ef325b0fa9fea40ed18e40e96205a10851f6434 100644 (file)
--- a/stdin.c
+++ b/stdin.c
@@ -46,18 +46,18 @@ static void stdin_pre_select(struct sched *s, struct task *t)
        para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
 }
 
-#define STDIN_MAX_PENDING (100 * 1024)
-
 static void stdin_pre_select_btr(struct sched *s, struct task *t)
 {
        struct stdin_task *sit = container_of(t, struct stdin_task, task);
+       int ret;
 
        t->error = 0;
-       if (btr_bytes_pending(sit->btrn) > STDIN_MAX_PENDING)
-               sit->check_fd = 0;
-       else {
-               sit->check_fd = 1;
+       ret = btr_node_status(sit->btrn, 0, BTR_NT_ROOT);
+       if (ret > 0)
                para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
+       else if (ret < 0) {
+               s->timeout.tv_sec = 0;
+               s->timeout.tv_usec = 1;
        }
 }
 
@@ -102,16 +102,14 @@ static void stdin_post_select_btr(struct sched *s, struct task *t)
        ssize_t ret;
        char *buf = NULL;
 
-       t->error = -E_STDIN_NO_CHILD;
-       if (btr_no_children(sit->btrn))
-               goto err;
-
        t->error = 0;
-       if (!sit->check_fd)
+       ret = btr_node_status(sit->btrn, 0, BTR_NT_ROOT);
+       if (ret < 0)
+               goto err;
+       if (ret == 0)
                return;
        if (!FD_ISSET(STDIN_FILENO, &s->rfds))
                return;
-
        buf = para_malloc(STDIN_INPUT_BUFFER_SIZE);
        ret = read(STDIN_FILENO, buf, STDIN_INPUT_BUFFER_SIZE);
        //PARA_CRIT_LOG("read ret: %d\n", ret);
@@ -126,6 +124,7 @@ static void stdin_post_select_btr(struct sched *s, struct task *t)
 err:
        free(buf);
        btr_remove_node(sit->btrn);
+       t->error = ret;
 }
 
 /**
index bf4a923de06c466c0234bae9da301f1b294f11cb..16e16465ad2d9671c6d9e818f10caed6aae2cb91 100644 (file)
--- a/stdout.c
+++ b/stdout.c
@@ -48,19 +48,17 @@ static void stdout_pre_select(struct sched *s, struct task *t)
 static void stdout_pre_select_btr(struct sched *s, struct task *t)
 {
        struct stdout_task *sot = container_of(t, struct stdout_task, task);
-       size_t sz = btr_get_input_queue_size(sot->btrn);
+       int ret;
 
        t->error = 0;
        sot->check_fd = 0;
-       if (sz == 0) {
-               if (btr_no_parent(sot->btrn)) {
-                       s->timeout.tv_sec = 0;
-                       s->timeout.tv_usec = 1;
-               }
-               return;
+       ret = btr_node_status(sot->btrn, 0, BTR_NT_LEAF);
+       if (ret > 0)
+               para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
+       else if (ret < 0) {
+               s->timeout.tv_sec = 0;
+               s->timeout.tv_usec = 1;
        }
-       sot->check_fd = 1;
-       para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
 }
 
 /**
@@ -100,32 +98,29 @@ static void stdout_post_select(struct sched *s, struct task *t)
 static void stdout_post_select_btr(struct sched *s, struct task *t)
 {
        struct stdout_task *sot = container_of(t, struct stdout_task, task);
-       ssize_t ret;
-       size_t sz = btr_get_input_queue_size(sot->btrn);
+       struct btr_node *btrn = sot->btrn;
+       int ret;
        char *buf;
+       size_t sz;
 
        t->error = 0;
-       if (!sot->check_fd) {
-               if (sz == 0 && btr_no_parent(sot->btrn)) {
-                       t->error = -E_STDOUT_EOF;
-                       goto err;
-               }
+       ret = btr_node_status(btrn, 0, BTR_NT_LEAF);
+       if (ret < 0)
+               goto err;
+       if (ret == 0)
                return;
-       }
        if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
                return;
-       sz = btr_next_buffer(sot->btrn, &buf);
-       if (sz == 0)
-               return;
+       sz = btr_next_buffer(btrn, &buf);
+       assert(sz > 0);
        ret = write_nonblock(STDOUT_FILENO, buf, sz, 0);
-       if (ret < 0) {
-               t->error = -ERRNO_TO_PARA_ERROR(errno);
+       if (ret < 0)
                goto err;
-       }
-       btr_consume(sot->btrn, ret);
+       btr_consume(btrn, ret);
        return;
 err:
-       btr_remove_node(sot->btrn);
+       btr_remove_node(btrn);
+       t->error = ret;
 }
 /**
  * Initialize a stdout task structure with default values.
index 7c25a74a4358cd76bfcaf88e8a1d8d7e502a75b8..00bdd6eb79b89c984674f4482c0f006ad89c13bb 100644 (file)
@@ -1239,7 +1239,7 @@ static void wmadec_post_select(__a_unused struct sched *s, struct task *t)
 next_buffer:
        converted = 0;
        t->error = 0;
-       ret = btr_node_status(btrn, fn->min_iqs);
+       ret = btr_node_status(btrn, fn->min_iqs, BTR_NT_INTERNAL);
        if (ret < 0)
                goto err;
        if (ret == 0)