s->fc->inbuf = s->receiver_node->buf;
s->fc->in_loaded = &s->receiver_node->loaded;
s->fc->input_eof = &s->receiver_node->eof;
-
s->fc->task.pre_select = filter_pre_select;
s->fc->task.event_handler = filter_event_handler;
s->fc->task.private_data = s->fc;
s->fc->task.flags = 0;
s->fc->eof = 0;
+
+ s->receiver_node->output_eof = &s->fc->eof;
sprintf(s->fc->task.status, "filter chain");
for (i = 0; i < nf; i++) {
struct filter_node *fn = para_calloc(sizeof(struct filter_node));
static void wng_event_handler(struct task *t)
{
- PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
- unregister_task(t);
+ struct writer_node_group *wng = t->private_data;
+ int i;
+
+ wng_unregister(wng);
+ FOR_EACH_SLOT(i) {
+ struct slot_info *s = &slot[i];
+ if (s->wng != wng)
+ continue;
+// if (s->fc)
+// s->fc->eof = 1;
+// if (s->receiver_node)
+// s->receiver_node->eof = 1;
+ PARA_INFO_LOG("slot %d: %s\n", i, PARA_STRERROR(-t->ret));
+ }
}
static void open_writers(int slot_num)
return;
PARA_INFO_LOG("closing slot %d \n", slot_num);
wng_close(s->wng);
- wng_destroy(s->wng);
close_filters(s->fc);
free(s->fc);
close_receiver(slot_num);