close_callbacks(fn);
PARA_INFO_LOG("closing %s filter\n", f->name);
f->close(fn);
+ free(fn->conf);
}
free(fc->filter_nodes);
}
}
-/** 640K ought to be enough for everybody ;) */
-#define FILTER_MAX_PENDING (640 * 1024)
-
-int prepare_filter_node(struct filter_node *fn)
-{
- struct btr_node *btrn = fn->btrn;
- size_t iqs;
-
- if (btr_eof(btrn))
- return -E_FC_EOF;
- if (btr_bytes_pending(btrn) > FILTER_MAX_PENDING)
- return 0;
- iqs = btr_get_input_queue_size(btrn);
- if (iqs < fn->min_iqs && !btr_no_parent(btrn))
- return 0;
- assert(iqs != 0);
- /* avoid "buffer too small" errors from the decoder */
- btr_merge(btrn, fn->min_iqs);
- return 1;
-}
-
void generic_filter_pre_select(struct sched *s, struct task *t)
{
struct filter_node *fn = container_of(t, struct filter_node, task);
- size_t iqs = btr_get_input_queue_size(fn->btrn);
t->error = 0;
- if (iqs <= fn->min_iqs)
- return;
- if (btr_bytes_pending(fn->btrn) > FILTER_MAX_PENDING)
- return; /* FIXME, should use reasonable bound on timeout */
- s->timeout.tv_sec = 0;
- s->timeout.tv_usec = 1;
+ if (btr_node_status(fn->btrn, fn->min_iqs, BTR_NT_INTERNAL) != 0) {
+ s->timeout.tv_sec = 0;
+ s->timeout.tv_usec = 1;
+ }
}