X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=file_write.c;h=7a24b3ac6b78e9d56b1f194c0479c3584cabbfb3;hp=9a4f0707f6f213f1fc88346b0aa2c039bebfa003;hb=c282c836791cedf57c128555af90af37c7c01c05;hpb=11e68b8de3eb8bf8b657333d5b8359260559e93b diff --git a/file_write.c b/file_write.c index 9a4f0707..7a24b3ac 100644 --- a/file_write.c +++ b/file_write.c @@ -10,12 +10,15 @@ #include #include #include +#include #include "para.h" #include "list.h" #include "sched.h" #include "ggo.h" +#include "buffer_tree.h" #include "write.h" +#include "write_common.h" #include "string.h" #include "fd.h" #include "file_write.cmdline.h" @@ -85,6 +88,24 @@ static int file_write_pre_select(struct sched *s, struct writer_node *wn) return 1; } +static void file_write_pre_select_btr(struct sched *s, struct task *t) +{ + struct writer_node *wn = container_of(t, struct writer_node, task); + struct private_file_write_data *pfwd = wn->private_data; + int ret; + + t->error = 0; + pfwd->check_fd = 0; + ret = btr_node_status(wn->btrn, wn->min_iqs, BTR_NT_LEAF); + if (ret > 0) { + para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno); + pfwd->check_fd = 1; + } else if (ret < 0) { + s->timeout.tv_sec = 0; + s->timeout.tv_usec = 1; + } +} + static int file_write_post_select(struct sched *s, struct writer_node *wn) { struct private_file_write_data *pfwd = wn->private_data; @@ -109,10 +130,45 @@ static int file_write_post_select(struct sched *s, struct writer_node *wn) static void file_write_close(struct writer_node *wn) { struct private_file_write_data *pfwd = wn->private_data; + close(pfwd->fd); + file_cmdline_parser_free(wn->conf); free(pfwd); } +static void file_write_post_select_btr(__a_unused struct sched *s, + struct task *t) +{ + struct writer_node *wn = container_of(t, struct writer_node, task); + struct private_file_write_data *pfwd = wn->private_data; + struct btr_node *btrn = wn->btrn; + int ret; + char *buf; + size_t bytes; + + t->error = 0; + ret = btr_node_status(btrn, wn->min_iqs, BTR_NT_LEAF); + if (ret == 0) + return; + if (ret < 0) + goto err; + if (!pfwd->check_fd) + return; + if (!FD_ISSET(pfwd->fd, &s->wfds)) + return; + bytes = btr_next_buffer(btrn, &buf); + assert(bytes > 0); + //PARA_INFO_LOG("writing %zu\n", bytes); + ret = write(pfwd->fd, buf, bytes); + if (ret < 0) + goto err; + btr_consume(btrn, ret); + return; +err: + assert(ret < 0); + t->error = ret; +} + __malloc static void *file_write_parse_config(const char *options) { struct file_write_args_info *conf @@ -126,6 +182,11 @@ __malloc static void *file_write_parse_config(const char *options) return NULL; } +static void file_write_free_config(void *conf) +{ + file_cmdline_parser_free(conf); +} + /** the init function of the file writer */ void file_write_init(struct writer *w) { @@ -134,8 +195,11 @@ void file_write_init(struct writer *w) file_cmdline_parser_init(&dummy); w->open = file_write_open; w->pre_select = file_write_pre_select; + w->pre_select_btr = file_write_pre_select_btr; w->post_select = file_write_post_select; + w->post_select_btr = file_write_post_select_btr; w->parse_config = file_write_parse_config; + w->free_config = file_write_free_config; w->close = file_write_close; w->shutdown = NULL; /* nothing to do */ w->help = (struct ggo_help) {