X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=filter_common.c;h=0c92f42c1e4b96760667e63a9dbc65b6dbf2d2a6;hp=a27c2d303ad28d823ae9af20eeb4878f9300cad1;hb=8b280eb4e3b4f46a95eef251520bcce3c761c9ad;hpb=e16a8a93779d7a94cf0c716f6dab3b42651d0205 diff --git a/filter_common.c b/filter_common.c index a27c2d30..0c92f42c 100644 --- a/filter_common.c +++ b/filter_common.c @@ -70,90 +70,6 @@ static void close_callbacks(struct filter_node *fn) } } -static void call_callbacks(struct filter_node *fn, char *inbuf, size_t inlen, - char *outbuf, size_t outlen) -{ - struct filter_callback *fcb, *tmp; - list_for_each_entry_safe(fcb, tmp, &fn->callbacks, node) { - int ret; - if (inlen && fcb->input_cb) { - ret = fcb->input_cb(inbuf, inlen, fcb); - if (ret < 0) { - close_filter_callback(fcb); - continue; - } - } - if (!outlen || !fcb->output_cb) - continue; - ret = fcb->output_cb(outbuf, outlen, fcb); - if (ret < 0) - close_filter_callback(fcb); - } -} - -/** - * Call the convert function of each filter. - * - * \param s Unused. - * \param t The task identifying the filter chain. - * - * This is the core function of the filter subsystem. It loops over the list of - * filter nodes determined by \a t and calls the filter's convert function if - * there is input available for the filter node in question. If the convert - * function consumed some or all of its input data, all registered input - * callbacks are called. Similarly, if a convert function produced output, all - * registered output callbacks get called. - * - * On errors a (negative) error code is stored in t->error. - * - * \sa filter_node, filter#convert, filter_callback. - */ -void filter_post_select(__a_unused struct sched *s, struct task *t) -{ - struct filter_chain *fc = container_of(t, struct filter_chain, task); - struct filter_node *fn; - char *ib; - size_t *loaded; - int i, conv, conv_total = 0; - - if (fc->output_error && *fc->output_error < 0) { - t->error = *fc->output_error; - return; - } -again: - ib = *fc->inbufp; - loaded = fc->in_loaded; - conv = 0; - FOR_EACH_FILTER_NODE(fn, fc, i) { - struct filter *f = filters + fn->filter_num; - if (fn->loaded < fn->bufsize) { - size_t size, old_fn_loaded = fn->loaded; - t->error = f->convert(ib, *loaded, fn); - if (t->error < 0) - return; - size = t->error; - call_callbacks(fn, ib, size, fn->buf + old_fn_loaded, - fn->loaded - old_fn_loaded); - *loaded -= size; - conv += size + fn->loaded - old_fn_loaded; - if (*loaded && size) - memmove(ib, ib + size, *loaded); - } - ib = fn->buf; - loaded = &fn->loaded; - } - conv_total += conv; - if (conv) - goto again; - if (*fc->input_error >= 0) - return; - if (*fc->out_loaded) - return; - if (*fc->in_loaded && conv_total) - return; - t->error = -E_FC_EOF; -} - /** * Close all filter nodes and their callbacks. * @@ -179,6 +95,7 @@ void close_filters(struct filter_chain *fc) close_callbacks(fn); PARA_INFO_LOG("closing %s filter\n", f->name); f->close(fn); + free(fn->conf); } free(fc->filter_nodes); } @@ -276,38 +193,14 @@ void print_filter_helps(int detailed) } -/** 640K ought to be enough for everybody ;) */ -#define FILTER_MAX_PENDING (640 * 1024) - -int prepare_filter_node(struct filter_node *fn) -{ - struct btr_node *btrn = fn->btrn; - size_t iqs; - - if (btr_eof(btrn)) - return -E_FC_EOF; - if (btr_bytes_pending(btrn) > FILTER_MAX_PENDING) - return 0; - iqs = btr_get_input_queue_size(btrn); - if (iqs < fn->min_iqs && !btr_no_parent(btrn)) - return 0; - assert(iqs != 0); - /* avoid "buffer too small" errors from the decoder */ - btr_merge(btrn, fn->min_iqs); - return 1; -} - void generic_filter_pre_select(struct sched *s, struct task *t) { struct filter_node *fn = container_of(t, struct filter_node, task); - size_t iqs = btr_get_input_queue_size(fn->btrn); t->error = 0; - if (iqs < fn->min_iqs) - return; - if (btr_bytes_pending(fn->btrn) > FILTER_MAX_PENDING) - return; /* FIXME, should use reasonable bound on timeout */ - s->timeout.tv_sec = 0; - s->timeout.tv_usec = 1; + if (btr_node_status(fn->btrn, fn->min_iqs, BTR_NT_INTERNAL) != 0) { + s->timeout.tv_sec = 0; + s->timeout.tv_usec = 1; + } }