X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=file_write.c;h=a9635ca2d558c3c541aca0c73bde435f6ef02acb;hp=9a4f0707f6f213f1fc88346b0aa2c039bebfa003;hb=d0f36435b0f81368a778fda33f3a7df86830f5ac;hpb=ef81b9f4f0fa6a26043c68d429c0deeb7c949351 diff --git a/file_write.c b/file_write.c index 9a4f0707..a9635ca2 100644 --- a/file_write.c +++ b/file_write.c @@ -10,12 +10,15 @@ #include #include #include +#include #include "para.h" #include "list.h" #include "sched.h" #include "ggo.h" +#include "buffer_tree.h" #include "write.h" +#include "write_common.h" #include "string.h" #include "fd.h" #include "file_write.cmdline.h" @@ -85,6 +88,25 @@ static int file_write_pre_select(struct sched *s, struct writer_node *wn) return 1; } +static void file_write_pre_select_btr(struct sched *s, struct task *t) +{ + struct writer_node *wn = container_of(t, struct writer_node, task); + struct private_file_write_data *pfwd = wn->private_data; + int ret; + + t->error = 0; + pfwd->check_fd = 0; + ret = btr_node_status(wn->btrn, wn->min_iqs); + if (ret >= 0) { + para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno); + pfwd->check_fd = 1; + } + if (ret != 0) { + s->timeout.tv_sec = 0; + s->timeout.tv_usec = 1; + } +} + static int file_write_post_select(struct sched *s, struct writer_node *wn) { struct private_file_write_data *pfwd = wn->private_data; @@ -109,10 +131,45 @@ static int file_write_post_select(struct sched *s, struct writer_node *wn) static void file_write_close(struct writer_node *wn) { struct private_file_write_data *pfwd = wn->private_data; + close(pfwd->fd); + file_cmdline_parser_free(wn->conf); free(pfwd); } +static void file_write_post_select_btr(__a_unused struct sched *s, + struct task *t) +{ + struct writer_node *wn = container_of(t, struct writer_node, task); + struct private_file_write_data *pfwd = wn->private_data; + struct btr_node *btrn = wn->btrn; + int ret; + char *buf; + size_t bytes; + + t->error = 0; + ret = btr_node_status(btrn, wn->min_iqs); + if (ret == 0) + return; + if (ret < 0) + goto err; + if (!pfwd->check_fd) + return; + if (!FD_ISSET(pfwd->fd, &s->wfds)) + return; + bytes = btr_next_buffer(btrn, &buf); + assert(bytes > 0); + //PARA_INFO_LOG("writing %zu\n", bytes); + ret = write(pfwd->fd, buf, bytes); + if (ret < 0) + goto err; + btr_consume(btrn, ret); + return; +err: + assert(ret < 0); + t->error = ret; +} + __malloc static void *file_write_parse_config(const char *options) { struct file_write_args_info *conf @@ -134,7 +191,9 @@ void file_write_init(struct writer *w) file_cmdline_parser_init(&dummy); w->open = file_write_open; w->pre_select = file_write_pre_select; + w->pre_select_btr = file_write_pre_select_btr; w->post_select = file_write_post_select; + w->post_select_btr = file_write_post_select_btr; w->parse_config = file_write_parse_config; w->close = file_write_close; w->shutdown = NULL; /* nothing to do */