Make para_recv use the new scheduler.
[paraslash.git] / write_common.c
index 6ff778a92d0451ab0500c2dc8b88fbff02e30323..0b2772ae8ea4d6928b828aa8bb2e2c5916833309 100644 (file)
 const char *writer_names[] ={WRITER_NAMES};
 struct writer writers[NUM_SUPPORTED_WRITERS] = {WRITER_ARRAY};
 
-int wng_write(struct writer_node_group *g, char *buf, size_t *loaded)
-{
-       int ret, i, need_more_writes = 1;
-       size_t min_written = 0;
-
-       while (need_more_writes) {
-               need_more_writes = 0;
-               FOR_EACH_WRITER_NODE(i, g) {
-                       size_t w = g->written[i];
-                       int bytes_to_write;
-                       struct writer_node *wn = &g->writer_nodes[i];
-                       if (!i)
-                               min_written = w;
-                       else
-                               min_written = PARA_MIN(min_written, w);
-                       if (w == *loaded)
-                               continue;
-                       if (!g->eof && (*loaded < wn->chunk_bytes + w))
-                               continue;
-                       bytes_to_write = PARA_MIN(wn->chunk_bytes,
-                               *loaded - w);
-                       ret = wn->writer->write(buf + w, bytes_to_write, wn);
-                       if (ret < 0)
-                               goto out;
-                       if (ret != bytes_to_write)
-                               PARA_WARNING_LOG("short write: %d/%d\n", ret,
-                                       bytes_to_write);
-                       g->written[i] += ret;
-                       need_more_writes = 1;
-               }
-       }
-       *loaded -= min_written;
-       ret = 0;
-       if (g->eof)
-               goto out;
-       if (*loaded)
-               memmove(buf, buf + min_written, *loaded);
-       FOR_EACH_WRITER_NODE(i, g)
-               g->written[i] -= min_written;
-       ret = 1;
-out:
-       return ret;
-}
-
 static void wng_post_select(struct sched *s, struct task *t)
 {
        struct writer_node_group *g = t->private_data;
@@ -89,8 +45,8 @@ static void wng_post_select(struct sched *s, struct task *t)
                        min_written = PARA_MIN(min_written, t->ret);
        }
        *g->loaded -= min_written;
-       if (!*g->loaded && g->eof)
-               t->ret = 0;
+       if (!*g->loaded && *g->eof)
+               t->ret = -E_WNG_EOF;
        else
                t->ret = 1;
        if (*g->loaded && min_written)
@@ -141,11 +97,11 @@ void wng_close(struct writer_node_group *g)
        }
 }
 
-static void wng_error_handler(struct task *t)
+static void wng_event_handler(struct task *t)
 {
        struct writer_node_group *g = t->private_data;
 
-       PARA_INFO_LOG("%p: ret = %d\n", t, t->ret);
+       PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
        unregister_task(t);
        wng_close(g);
        wng_destroy(g);
@@ -160,8 +116,8 @@ struct writer_node_group *wng_new(unsigned num_writers)
        g->written = para_calloc(num_writers * sizeof(size_t));
        g->task.private_data = g;
        g->task.post_select = wng_post_select;
-       g->task.error_handler = wng_error_handler;
-       g->task.flags = POST_ADD_TAIL | POST_EOF_IS_ERROR;
+       g->task.event_handler = wng_event_handler;
+       g->task.flags = POST_ADD_TAIL;
        return g;
 }
 
@@ -201,6 +157,8 @@ struct writer_node_group *setup_default_wng(void)
        else
                default_writer = 1;
        wng->writer_nodes[0].writer = &writers[default_writer];
+       sprintf(wng->writer_nodes[0].task.status, "%s",
+               writer_names[default_writer]);
        PARA_INFO_LOG("using default writer: %s\n",
                writer_names[default_writer]);
        return wng;