Simplify the scheduling code.
authorAndre Noll <maan@systemlinux.org>
Sun, 30 Mar 2008 20:56:16 +0000 (22:56 +0200)
committerAndre Noll <maan@systemlinux.org>
Sun, 30 Mar 2008 20:56:16 +0000 (22:56 +0200)
- Get rid of sched->select_ret. Tasks shouldn't care about the
return value of the select call.

- Kill task->private_data. Use container_of() instead.

- Remove task->event_handler. It is never necessary and only
makes the code more convoluted. The scheduler unregisters
tasks as soon as either the pre_select or the post_select
functions return an error.

- Rename task->ret to task->error and get rid of a couple
of error fields in other structs that usually only contained
a copy of the task's error value.

This conversion likely introduces many bugs that have to be
shaken out in subsequent patches. Hopefully it will result in
less error-prone code in the long run.

24 files changed:
afs.c
audiod.c
audiod.h
client.c
client.h
client_common.c
dccp_recv.c
error.h
filter.c
filter.h
filter_chain.c
http_recv.c
ortp_recv.c
recv.c
recv.h
sched.c
sched.h
stdin.c
stdin.h
stdout.c
stdout.h
write.c
write.h
write_common.c

diff --git a/afs.c b/afs.c
index 451a492..ab1a1df 100644 (file)
--- a/afs.c
+++ b/afs.c
@@ -755,34 +755,29 @@ static void unregister_tasks(void)
 
 static void signal_pre_select(struct sched *s, struct task *t)
 {
-       struct signal_task *st = t->private_data;
-       t->ret = 1;
+       struct signal_task *st = container_of(t, struct signal_task, task);
        para_fd_set(st->fd, &s->rfds, &s->max_fileno);
 }
 
 static void signal_post_select(struct sched *s, struct task *t)
 {
-       struct signal_task *st = t->private_data;
-       t->ret = -E_AFS_PARENT_DIED;
-       if (getppid() == 1)
-               goto err;
-       t->ret = 1;
+       struct signal_task *st = container_of(t, struct signal_task, task);
+       if (getppid() == 1) {
+               t->error = -E_AFS_PARENT_DIED;
+               return;
+       }
        if (!FD_ISSET(st->fd, &s->rfds))
                return;
        st->signum = para_next_signal();
-       t->ret = 1;
        if (st->signum == SIGHUP) {
                close_afs_tables();
-               t->ret = open_afs_tables();
-               if (t->ret < 0)
-                       goto err;
+               t->error = open_afs_tables();
+               if (t->error < 0)
+                       return;
                init_admissible_files(current_mop);
                return;
        }
-       t->ret = -E_AFS_SIGNAL;
-err:
-       PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
-       unregister_tasks();
+       t->error = -E_AFS_SIGNAL;
 }
 
 static void register_signal_task(void)
@@ -805,7 +800,6 @@ static void register_signal_task(void)
 
        st->task.pre_select = signal_pre_select;
        st->task.post_select = signal_post_select;
-       st->task.private_data = st;
        sprintf(st->task.status, "signal task");
        register_task(&st->task);
 }
@@ -824,14 +818,13 @@ struct afs_client {
 
 static void command_pre_select(struct sched *s, struct task *t)
 {
-       struct command_task *ct = t->private_data;
+       struct command_task *ct = container_of(t, struct command_task, task);
        struct afs_client *client;
 
        para_fd_set(server_socket, &s->rfds, &s->max_fileno);
        para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
        list_for_each_entry(client, &afs_client_list, node)
                para_fd_set(client->fd, &s->rfds, &s->max_fileno);
-       t->ret = 1;
 }
 
 /**
@@ -966,10 +959,10 @@ err:
 
 static void command_post_select(struct sched *s, struct task *t)
 {
-       struct command_task *ct = t->private_data;
+       struct command_task *ct = container_of(t, struct command_task, task);
        struct sockaddr_un unix_addr;
        struct afs_client *client, *tmp;
-       int fd;
+       int fd, ret;
        if (FD_ISSET(server_socket, &s->rfds))
                execute_server_command();
 
@@ -990,25 +983,23 @@ static void command_post_select(struct sched *s, struct task *t)
        }
        /* Accept connections on the local socket. */
        if (!FD_ISSET(ct->fd, &s->rfds))
-               goto out;
-       t->ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr));
-       if (t->ret < 0) {
-               PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
-               goto out;
+               return;
+       ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr));
+       if (ret < 0) {
+               PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+               return;
        }
-       fd = t->ret;
-       t->ret = mark_fd_nonblocking(fd);
-       if (t->ret < 0) {
-               PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
+       fd = ret;
+       ret = mark_fd_nonblocking(fd);
+       if (ret < 0) {
+               PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
                close(fd);
-               goto out;
+               return;
        }
        client = para_malloc(sizeof(*client));
        client->fd = fd;
        client->connect_time = *now;
        para_list_add(&client->node, &afs_client_list);
-out:
-       t->ret = 1;
 }
 
 static void register_command_task(uint32_t cookie)
@@ -1019,7 +1010,6 @@ static void register_command_task(uint32_t cookie)
 
        ct->task.pre_select = command_pre_select;
        ct->task.post_select = command_post_select;
-       ct->task.private_data = ct;
        sprintf(ct->task.status, "command task");
        register_task(&ct->task);
 }
index 6fd8eef..bde2b1e 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -212,21 +212,21 @@ static void kill_all_decoders(int error)
 
        FOR_EACH_SLOT(i) {
                struct slot_info *s = &slot[i];
-               if (s->wng && !s->wng->error) {
+               if (s->wng && !s->wng->task.error) {
                        PARA_INFO_LOG("unregistering writer node group in slot %d\n",
                                i);
                        wng_unregister(s->wng);
-                       s->wng->error = error;
+                       s->wng->task.error = error;
                }
-               if (s->fc && !s->fc->error) {
+               if (s->fc && !s->fc->task.error) {
                        PARA_INFO_LOG("unregistering filter chain in slot %d\n", i);
                        unregister_task(&s->fc->task);
-                       s->fc->error = error;
+                       s->fc->task.error = error;
                }
-               if (s->receiver_node && !s->receiver_node->error) {
+               if (s->receiver_node && !s->receiver_node->task.error) {
                        PARA_INFO_LOG("unregistering receiver_node in slot %d\n", i);
                        unregister_task(&s->receiver_node->task);
-                       s->receiver_node->error = error;
+                       s->receiver_node->task.error = error;
                }
        }
 }
@@ -264,14 +264,6 @@ int num_filters(int audio_format_num)
        return afi[audio_format_num].num_filters;
 }
 
-static void filter_event_handler(struct task *t)
-{
-       PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
-       struct filter_chain *fc = t->private_data;
-       fc->error = t->ret;
-       unregister_task(t);
-}
-
 static void open_filters(int slot_num)
 {
        struct slot_info *s = &slot[slot_num];
@@ -287,13 +279,11 @@ static void open_filters(int slot_num)
        INIT_LIST_HEAD(&s->fc->filters);
        s->fc->inbuf = s->receiver_node->buf;
        s->fc->in_loaded = &s->receiver_node->loaded;
-       s->fc->input_error = &s->receiver_node->error;
+       s->fc->input_error = &s->receiver_node->task.error;
        s->fc->task.pre_select = filter_pre_select;
-       s->fc->task.event_handler = filter_event_handler;
-       s->fc->task.private_data = s->fc;
-       s->fc->error = 0;
+       s->fc->task.error = 0;
 
-       s->receiver_node->output_error = &s->fc->error;
+       s->receiver_node->output_error = &s->fc->task.error;
        sprintf(s->fc->task.status, "filter chain");
        for (i = 0; i < nf; i++) {
                struct filter_node *fn = para_calloc(sizeof(struct filter_node));
@@ -312,15 +302,6 @@ static void open_filters(int slot_num)
        register_task(&s->fc->task);
 }
 
-static void wng_event_handler(struct task *t)
-{
-       struct writer_node_group *wng = t->private_data;
-
-       PARA_INFO_LOG("%s\n", para_strerror(-t->ret));
-       wng->error = t->ret;
-       wng_unregister(wng);
-}
-
 static void open_writers(int slot_num)
 {
        int ret, i;
@@ -335,17 +316,16 @@ static void open_writers(int slot_num)
        if (s->fc) {
                s->wng->buf = s->fc->outbuf;
                s->wng->loaded = s->fc->out_loaded;
-               s->wng->input_error = &s->fc->error;
+               s->wng->input_error = &s->fc->task.error;
                s->wng->channels = &s->fc->channels;
                s->wng->samplerate = &s->fc->samplerate;
-               s->fc->output_error = &s->wng->error;
+               s->fc->output_error = &s->wng->task.error;
                PARA_INFO_LOG("samplerate: %d\n", *s->wng->samplerate);
        } else {
                s->wng->buf = s->receiver_node->buf;
                s->wng->loaded = &s->receiver_node->loaded;
-               s->wng->input_error = &s->receiver_node->error;
+               s->wng->input_error = &s->receiver_node->task.error;
        }
-       s->wng->task.event_handler = wng_event_handler;
        for (i = 0; i < a->num_writers; i++) {
                s->wng->writer_nodes[i].conf = a->writer_conf[i];
                s->wng->writer_nodes[i].writer = a->writers[i];
@@ -359,6 +339,7 @@ static void open_writers(int slot_num)
        activate_inactive_grab_clients(slot_num, s->format, &s->fc->filters);
 }
 
+#if 0
 static void rn_event_handler(struct task *t)
 {
        struct receiver_node *rn = t->private_data;
@@ -378,6 +359,7 @@ static void rn_event_handler(struct task *t)
                tv_add(now, &restart_delay, &afi[slot[i].format].restart_barrier);
        }
 }
+#endif
 
 static int open_receiver(int format)
 {
@@ -405,10 +387,8 @@ static int open_receiver(int format)
        }
        PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n",
                audio_formats[s->format], a->receiver->name, slot_num);
-       rn->task.private_data = s->receiver_node;
        rn->task.pre_select = a->receiver->pre_select;
        rn->task.post_select = a->receiver->post_select;
-       rn->task.event_handler = rn_event_handler;
        sprintf(rn->task.status, "%s receiver node", rn->receiver->name);
        register_task(&rn->task);
        return 1;
@@ -425,7 +405,7 @@ static int receiver_running(int format)
        FOR_EACH_SLOT(i) {
                struct slot_info *s = &slot[i];
                if (s->format == format && s->receiver_node
-                               && !s->receiver_node->error)
+                               && s->receiver_node->task.error >= 0)
                        return 1;
        }
        return 0;
@@ -436,7 +416,7 @@ static int open_current_receiver(struct sched *s)
        struct timeval diff;
        int cafn = stat_task->current_audio_format_num;
 
-       if (cafn < 0 || !stat_task->pcd)
+       if (cafn < 0 || !stat_task->ct)
                return 0;
        if (receiver_running(cafn))
                return 0;
@@ -496,7 +476,7 @@ static int check_stat_line(char *line, __a_unused void *data)
        long unsigned sec, usec;
        char *tmp;
 
-//     PARA_INFO_LOG("line: %s\n", line);
+       //PARA_INFO_LOG("line: %s\n", line);
        if (!line)
                return 1;
        itemnum = stat_line_valid(line);
@@ -515,6 +495,7 @@ static int check_stat_line(char *line, __a_unused void *data)
        switch (itemnum) {
        case SI_STATUS:
                stat_task->playing = strstr(line, "playing")? 1 : 0;
+               PARA_INFO_LOG("stat task playing: %d\n", stat_task->playing);
                break;
        case SI_OFFSET:
                stat_task->offset_seconds = atoi(line + ilen + 1);
@@ -561,11 +542,11 @@ static void try_to_close_slot(int slot_num)
 
        if (s->format < 0)
                return;
-       if (s->receiver_node && !s->receiver_node->error)
+       if (s->receiver_node && s->receiver_node->task.error >= 0)
                return;
-       if (s->fc && !s->fc->error)
+       if (s->fc && s->fc->task.error >= 0)
                return;
-       if (s->wng && !s->wng->error)
+       if (s->wng && s->wng->task.error >= 0)
                return;
        PARA_INFO_LOG("closing slot %d\n", slot_num);
        wng_close(s->wng);
@@ -586,7 +567,6 @@ static void audiod_pre_select(struct sched *s, __a_unused struct task *t)
        int i;
        struct timeval min_delay = {0, 1};
 
-       t->ret = 1;
        if (audiod_status != AUDIOD_ON || !stat_task->playing)
                return kill_all_decoders(-E_NOT_PLAYING);
        if (open_current_receiver(s))
@@ -634,7 +614,6 @@ static void audiod_post_select(__a_unused struct sched *s,
 {
        int i;
 
-       t->ret = 1;
        FOR_EACH_SLOT(i)
                try_to_close_slot(i);
 }
@@ -643,8 +622,7 @@ static void init_audiod_task(struct task *t)
 {
        t->pre_select = audiod_pre_select;
        t->post_select = audiod_post_select;
-       t->event_handler = NULL;
-       t->private_data = t;
+       t->error = 0;
        sprintf(t->status, "audiod task");
 }
 
@@ -883,58 +861,48 @@ err:
        exit(EXIT_FAILURE);
 }
 
-static void signal_event_handler(struct task *t)
-{
-       struct signal_task *st = t->private_data;
-
-       switch (st->signum) {
-       case SIGINT:
-       case SIGTERM:
-       case SIGHUP:
-               PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
-               clean_exit(EXIT_FAILURE, "caught deadly signal");
-       }
-}
-
 static void signal_pre_select(struct sched *s, struct task *t)
 {
-       struct signal_task *st = t->private_data;
-       t->ret = 1;
+       struct signal_task *st = container_of(t, struct signal_task, task);
        para_fd_set(st->fd, &s->rfds, &s->max_fileno);
 }
 
 static void signal_post_select(struct sched *s, struct task *t)
 {
-       struct signal_task *st = t->private_data;
-       t->ret = 1;
+       struct signal_task *st = container_of(t, struct signal_task, task);
+       int signum;
+
        if (!FD_ISSET(st->fd, &s->rfds))
                return;
-       t->ret = -E_SIGNAL_CAUGHT;
-       st->signum = para_next_signal();
+
+       signum = para_next_signal();
+       switch (signum) {
+       case SIGINT:
+       case SIGTERM:
+       case SIGHUP:
+               PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
+               clean_exit(EXIT_FAILURE, "caught deadly signal");
+       }
 }
 
 static void signal_setup_default(struct signal_task *st)
 {
        st->task.pre_select = signal_pre_select;
        st->task.post_select = signal_post_select;
-       st->task.private_data = st;
        sprintf(st->task.status, "signal task");
 }
 
 static void command_pre_select(struct sched *s, struct task *t)
 {
-       struct command_task *ct = t->private_data;
-       t->ret = 1;
+       struct command_task *ct = container_of(t, struct command_task, task);
        para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
-
 }
 
 static void command_post_select(struct sched *s, struct task *t)
 {
        int ret;
-       struct command_task *ct = t->private_data;
+       struct command_task *ct = container_of(t, struct command_task, task);
 
-       t->ret = 1; /* always successful */
        audiod_status_dump();
        if (!FD_ISSET(ct->fd, &s->rfds))
                return;
@@ -947,8 +915,7 @@ static void init_command_task(struct command_task *ct)
 {
        ct->task.pre_select = command_pre_select;
        ct->task.post_select = command_post_select;
-       ct->task.event_handler = NULL;
-       ct->task.private_data = ct;
+       ct->task.error = 0;
        ct->fd = audiod_get_socket(); /* doesn't return on errors */
        sprintf(ct->task.status, "command task");
 }
@@ -957,10 +924,10 @@ static void close_stat_pipe(void)
 {
        int i;
 
-       if (!stat_task->pcd)
+       if (!stat_task->ct)
                return;
-       client_close(stat_task->pcd);
-       stat_task->pcd = NULL;
+       client_close(stat_task->ct);
+       stat_task->ct = NULL;
        FOR_EACH_STATUS_ITEM(i) {
                free(stat_task->stat_item_values[i]);
                stat_task->stat_item_values[i] = NULL;
@@ -1009,6 +976,7 @@ static void set_stat_task_restart_barrier(void)
        tv_add(now, &delay, &stat_task->restart_barrier);
 }
 
+#if 0
 static void client_task_event_handler(__a_unused struct task *t)
 {
        int i;
@@ -1023,14 +991,14 @@ static void client_task_event_handler(__a_unused struct task *t)
        FOR_EACH_AUDIO_FORMAT(i)
                afi[i].restart_barrier = stat_task->restart_barrier;
 }
+#endif
 
 static void status_pre_select(struct sched *s, struct task *t)
 {
-       struct status_task *st = t->private_data;
+       struct status_task *st = container_of(t, struct status_task, task);
        int ret;
 
-       t->ret = 1; /* always successful */
-       if (st->pcd || audiod_status == AUDIOD_OFF)
+       if (st->ct || audiod_status == AUDIOD_OFF)
                return;
        if (!st->clock_diff_count && tv_diff(now, &st->restart_barrier, NULL)
                        < 0)
@@ -1041,40 +1009,38 @@ static void status_pre_select(struct sched *s, struct task *t)
                if (tv_diff(now, &st->clock_diff_barrier, NULL) < 0)
                        return;
                PARA_INFO_LOG("clock diff count: %d\n", st->clock_diff_count);
-               ret = client_open(argc, argv, &st->pcd);
+               ret = client_open(argc, argv, &st->ct);
 
        } else {
                char *argv[] = {"audiod", "stat", NULL};
                int argc = 2;
-               ret = client_open(argc, argv, &st->pcd);
+               ret = client_open(argc, argv, &st->ct);
        }
        set_stat_task_restart_barrier();
        if (ret < 0)
                return;
-       st->pcd->task.event_handler = client_task_event_handler;
        s->timeout.tv_sec = 0;
        s->timeout.tv_usec = 1;
 }
 
 static void status_post_select(__a_unused struct sched *s, struct task *t)
 {
-       struct status_task *st = t->private_data;
+       struct status_task *st = container_of(t, struct status_task, task);
        unsigned bytes_left;
 
-       t->ret = 1;
-       if (!st->pcd || st->pcd->status != CL_RECEIVING)
+       if (!st->ct || st->ct->status != CL_RECEIVING)
                return;
-       if (st->pcd && audiod_status == AUDIOD_OFF) {
-               unregister_task(&st->pcd->task);
+       if (st->ct && audiod_status == AUDIOD_OFF) {
+               unregister_task(&st->ct->task);
                close_stat_pipe();
                st->clock_diff_count = conf.clock_diff_count_arg;
                return;
        }
-       bytes_left = for_each_line(st->pcd->buf, st->pcd->loaded,
+       bytes_left = for_each_line(st->ct->buf, st->ct->loaded,
                &check_stat_line, NULL);
-       if (st->pcd->loaded != bytes_left) {
+       if (st->ct->loaded != bytes_left) {
                st->last_status_read = *now;
-               st->pcd->loaded = bytes_left;
+               st->ct->loaded = bytes_left;
        } else {
                struct timeval diff;
                tv_diff(now, &st->last_status_read, &diff);
@@ -1088,7 +1054,6 @@ static void init_status_task(struct status_task *st)
        memset(st, 0, sizeof(struct status_task));
        st->task.pre_select = status_pre_select;
        st->task.post_select = status_post_select;
-       st->task.private_data = st;
        st->sa_time_diff_sign = 1;
        st->clock_diff_count = conf.clock_diff_count_arg;
        st->current_audio_format_num = -1;
@@ -1164,7 +1129,6 @@ int main(int argc, char *argv[])
        init_grabbing();
        setup_signal_handling();
        signal_setup_default(sig_task);
-       sig_task->task.event_handler = signal_event_handler;
 
        init_status_task(stat_task);
        init_command_task(cmd_task);
index c6746d2..f518a49 100644 (file)
--- a/audiod.h
+++ b/audiod.h
@@ -76,7 +76,7 @@ struct status_task {
        /** the associated task structure of audiod */
        struct task task;
        /** client data associated with the stat task */
-       struct private_client_data *pcd;
+       struct client_task *ct;
        /** the array of status items sent by para_server */
        char *stat_item_values[NUM_STAT_ITEMS];
        /** do not restart client command until this time */
index fec88a6..f47a0be 100644 (file)
--- a/client.c
+++ b/client.c
 
 INIT_CLIENT_ERRLISTS;
 
-static struct private_client_data *pcd;
+static struct client_task *ct;
 static struct stdin_task sit;
 static struct stdout_task sot;
 
-
-INIT_STDERR_LOGGING(pcd->conf.loglevel_arg);
-
-static void client_event_handler(struct task *t)
+static void supervisor_pre_select(struct sched *s, struct task *t)
 {
-       struct private_client_data *p = t->private_data;
-
-       PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
-       if (t->ret != -E_HANDSHAKE_COMPLETE) {
-               unregister_task(t);
-               p->error = t->ret;
+       if (ct->task.error < 0) {
+               t->error = ct->task.error;
                return;
        }
-       if (p->status == CL_SENDING) {
+       if (ct->status == CL_SENDING) {
                stdin_set_defaults(&sit);
                sit.buf = para_malloc(sit.bufsize),
                register_task(&sit.task);
-               p->inbuf = sit.buf;
-               p->in_loaded = &sit.loaded;
-               p->in_error = &sit.error;
-               return;
+               ct->inbuf = sit.buf;
+               ct->in_loaded = &sit.loaded;
+               ct->in_error = &sit.task.error;
+               t->error = -1;
+               goto min_delay;
        }
-       stdout_set_defaults(&sot);
-       sot.buf = p->buf;
-       sot.loaded = &p->loaded;
-       sot.input_error = &p->error;
-       register_task(&sot.task);
+       if (ct->status == CL_RECEIVING) {
+               stdout_set_defaults(&sot);
+               sot.buf = ct->buf;
+               sot.loaded = &ct->loaded;
+               sot.input_error = &ct->task.error;
+               register_task(&sot.task);
+               t->error = -1;
+               goto min_delay;
+       }
+       return;
+min_delay:
+       s->timeout.tv_sec = 0;
+       s->timeout.tv_usec = 1;
 }
 
+static struct task svt = {
+       .pre_select = supervisor_pre_select
+};
+
+INIT_STDERR_LOGGING(ct->conf.loglevel_arg);
+
+
 /**
- * the client program to connect to para_server
+ * The client program to connect to para_server.
  *
- * \param argc usual argument count
- * \param argv usual argument vector
+ * \param argc Usual argument count.
+ * \param argv Usual argument vector.
  *
  * It registers two tasks: The client task that communicates with para_server
  * and the standard out task that writes any output produced by the client task
@@ -73,13 +82,13 @@ int main(int argc, char *argv[])
 
        s.default_timeout.tv_sec = 1;
        s.default_timeout.tv_usec = 0;
-       ret = client_open(argc, argv, &pcd);
+       ret = client_open(argc, argv, &ct);
        if (ret < 0) /* can not use PARA_LOG here */
                exit(EXIT_FAILURE);
-       pcd->task.event_handler = client_event_handler;
+       register_task(&svt);
        ret = schedule(&s);
        if (ret < 0)
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
-       client_close(pcd);
+       client_close(ct);
        return ret >= 0? EXIT_SUCCESS: EXIT_FAILURE;
 }
index 3b115fa..dcec361 100644 (file)
--- a/client.h
+++ b/client.h
@@ -38,7 +38,7 @@ enum {
 /**
  * data specific to a client task
  */
-struct private_client_data {
+struct client_task {
        /** the state of the connection */
        int status;
        /** the file descriptor */
@@ -57,8 +57,6 @@ struct private_client_data {
        RC4_KEY rc4_send_key;
        /** the client task structure */
        struct task task;
-       /** non-zero if task is unregistered */
-       int error;
        /** the buffer used for handshake and receiving */
        char buf[CLIENT_BUFSIZE];
        /** number of bytes loaded in \p buf */
@@ -77,5 +75,5 @@ struct private_client_data {
        int *in_error;
 };
 
-void client_close(struct private_client_data *pcd);
-int client_open(int argc, char *argv[], struct private_client_data **pcd_ptr);
+void client_close(struct client_task *ct);
+int client_open(int argc, char *argv[], struct client_task **ct);
index c25e88f..ee3a13f 100644 (file)
@@ -33,8 +33,8 @@
 static void rc4_send(unsigned long len, const unsigned char *indata,
                unsigned char *outdata, void *private_data)
 {
-       struct private_client_data *pcd = private_data;
-       RC4(&pcd->rc4_send_key, len, indata, outdata);
+       struct client_task *ct = private_data;
+       RC4(&ct->rc4_send_key, len, indata, outdata);
 }
 
 /*
@@ -45,30 +45,30 @@ static void rc4_send(unsigned long len, const unsigned char *indata,
 static void rc4_recv(unsigned long len, const unsigned char *indata,
                unsigned char *outdata, void *private_data)
 {
-       struct private_client_data *pcd = private_data;
-       RC4(&pcd->rc4_recv_key, len, indata, outdata);
+       struct client_task *ct = private_data;
+       RC4(&ct->rc4_recv_key, len, indata, outdata);
 }
 
 /**
  * Close the connection to para_server and free all resources.
  *
- * \param pcd Pointer to the client data.
+ * \param ct Pointer to the client data.
  *
  * \sa client_open.
  */
-void client_close(struct private_client_data *pcd)
+void client_close(struct client_task *ct)
 {
-       if (!pcd)
+       if (!ct)
                return;
-       if (pcd->fd >= 0) {
-               disable_crypt(pcd->fd);
-               close(pcd->fd);
+       if (ct->fd >= 0) {
+               disable_crypt(ct->fd);
+               close(ct->fd);
        }
-       free(pcd->user);
-       free(pcd->config_file);
-       free(pcd->key_file);
-       client_cmdline_parser_free(&pcd->conf);
-       free(pcd);
+       free(ct->user);
+       free(ct->config_file);
+       free(ct->key_file);
+       client_cmdline_parser_free(&ct->conf);
+       free(ct);
 }
 
 /**
@@ -87,43 +87,42 @@ void client_close(struct private_client_data *pcd)
  */
 static void client_pre_select(struct sched *s, struct task *t)
 {
-       struct private_client_data *pcd = t->private_data;
+       struct client_task *ct = container_of(t, struct client_task, task);
 
-       t->ret = 1;
-       pcd->check_r = 0;
-       pcd->check_w = 0;
-       if (pcd->fd < 0)
+       ct->check_r = 0;
+       ct->check_w = 0;
+       if (ct->fd < 0)
                return;
-       switch (pcd->status) {
+       switch (ct->status) {
        case CL_CONNECTED:
        case CL_SENT_AUTH:
        case CL_SENT_CH_RESPONSE:
        case CL_SENT_COMMAND:
-               para_fd_set(pcd->fd, &s->rfds, &s->max_fileno);
-               pcd->check_r = 1;
+               para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
+               ct->check_r = 1;
                return;
 
        case CL_RECEIVED_WELCOME:
        case CL_RECEIVED_CHALLENGE:
        case CL_RECEIVED_PROCEED:
-               para_fd_set(pcd->fd, &s->wfds, &s->max_fileno);
-               pcd->check_w = 1;
+               para_fd_set(ct->fd, &s->wfds, &s->max_fileno);
+               ct->check_w = 1;
                return;
 
        case CL_RECEIVING:
-               if (pcd->loaded < CLIENT_BUFSIZE - 1) {
-                       para_fd_set(pcd->fd, &s->rfds, &s->max_fileno);
-                       pcd->check_r = 1;
+               if (ct->loaded < CLIENT_BUFSIZE - 1) {
+                       para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
+                       ct->check_r = 1;
                }
                return;
        case CL_SENDING:
-               if (*pcd->in_loaded) {
-                       PARA_INFO_LOG("loaded: %zd\n", *pcd->in_loaded);
-                       para_fd_set(pcd->fd, &s->wfds, &s->max_fileno);
-                       pcd->check_w = 1;
+               if (*ct->in_loaded) {
+                       PARA_INFO_LOG("loaded: %zd\n", *ct->in_loaded);
+                       para_fd_set(ct->fd, &s->wfds, &s->max_fileno);
+                       ct->check_w = 1;
                } else {
-                       if (*pcd->in_error) {
-                               t->ret = *pcd->in_error;
+                       if (*ct->in_error) {
+                               t->error = *ct->in_error;
                                s->timeout.tv_sec = 0;
                                s->timeout.tv_usec = 1;
                        }
@@ -132,14 +131,14 @@ static void client_pre_select(struct sched *s, struct task *t)
        }
 }
 
-static ssize_t client_recv_buffer(struct private_client_data *pcd)
+static ssize_t client_recv_buffer(struct client_task *ct)
 {
-       ssize_t ret = recv_buffer(pcd->fd, pcd->buf + pcd->loaded,
-               CLIENT_BUFSIZE - pcd->loaded);
+       ssize_t ret = recv_buffer(ct->fd, ct->buf + ct->loaded,
+               CLIENT_BUFSIZE - ct->loaded);
        if (!ret)
                return -E_SERVER_EOF;
        if (ret > 0)
-               pcd->loaded += ret;
+               ct->loaded += ret;
        return ret;
 
 }
@@ -159,151 +158,148 @@ static ssize_t client_recv_buffer(struct private_client_data *pcd)
  */
 static void client_post_select(struct sched *s, struct task *t)
 {
-       struct private_client_data *pcd = t->private_data;
+       struct client_task *ct = container_of(t, struct client_task, task);
 
-//     PARA_INFO_LOG("status %d\n", pcd->status);
-       t->ret = 1;
-       if (pcd->fd < 0)
+       t->error = 0;
+       if (ct->fd < 0)
                return;
-       if (!pcd->check_r && !pcd->check_w)
+       if (!ct->check_r && !ct->check_w)
                return;
-       if (pcd->check_r && !FD_ISSET(pcd->fd, &s->rfds))
+       if (ct->check_r && !FD_ISSET(ct->fd, &s->rfds))
                return;
-       if (pcd->check_w && !FD_ISSET(pcd->fd, &s->wfds))
+       if (ct->check_w && !FD_ISSET(ct->fd, &s->wfds))
                return;
-       switch (pcd->status) {
+       switch (ct->status) {
        case CL_CONNECTED: /* receive welcome message */
-               t->ret = client_recv_buffer(pcd);
-               if (t->ret > 0)
-                       pcd->status = CL_RECEIVED_WELCOME;
+               t->error = client_recv_buffer(ct);
+               if (t->error > 0)
+                       ct->status = CL_RECEIVED_WELCOME;
                return;
        case CL_RECEIVED_WELCOME: /* send auth command */
-               sprintf(pcd->buf, "auth %s%s", pcd->conf.plain_given?
-                       "" : "rc4 ", pcd->user);
-               PARA_INFO_LOG("--> %s\n", pcd->buf);
-               t->ret = send_buffer(pcd->fd, pcd->buf);
-               if (t->ret >= 0)
-                       pcd->status = CL_SENT_AUTH;
+               sprintf(ct->buf, "auth %s%s", ct->conf.plain_given?
+                       "" : "rc4 ", ct->user);
+               PARA_INFO_LOG("--> %s\n", ct->buf);
+               t->error = send_buffer(ct->fd, ct->buf);
+               if (t->error >= 0)
+                       ct->status = CL_SENT_AUTH;
                return;
        case CL_SENT_AUTH: /* receive challenge number */
-               pcd->loaded = 0;
-               t->ret = client_recv_buffer(pcd);
-               if (t->ret < 0)
+               ct->loaded = 0;
+               t->error = client_recv_buffer(ct);
+               if (t->error < 0)
                        return;
-               if (t->ret != 64) {
-                       t->ret = -E_INVALID_CHALLENGE;
-                       PARA_ERROR_LOG("received the following: %s\n", pcd->buf);
+               if (t->error != 64) {
+                       t->error = -E_INVALID_CHALLENGE;
+                       PARA_ERROR_LOG("received the following: %s\n", ct->buf);
                        return;
                }
                PARA_INFO_LOG("<-- [challenge]\n");
                /* decrypt challenge number */
-               t->ret = para_decrypt_challenge(pcd->key_file, &pcd->challenge_nr,
-                       (unsigned char *) pcd->buf, 64);
-               if (t->ret > 0)
-                       pcd->status = CL_RECEIVED_CHALLENGE;
+               t->error = para_decrypt_challenge(ct->key_file, &ct->challenge_nr,
+                       (unsigned char *) ct->buf, 64);
+               if (t->error > 0)
+                       ct->status = CL_RECEIVED_CHALLENGE;
                return;
        case CL_RECEIVED_CHALLENGE: /* send decrypted challenge */
-               PARA_INFO_LOG("--> %lu\n", pcd->challenge_nr);
-               t->ret = send_va_buffer(pcd->fd, "%s%lu", CHALLENGE_RESPONSE_MSG,
-                       pcd->challenge_nr);
-               if (t->ret > 0)
-                       pcd->status = CL_SENT_CH_RESPONSE;
+               PARA_INFO_LOG("--> %lu\n", ct->challenge_nr);
+               t->error = send_va_buffer(ct->fd, "%s%lu", CHALLENGE_RESPONSE_MSG,
+                       ct->challenge_nr);
+               if (t->error > 0)
+                       ct->status = CL_SENT_CH_RESPONSE;
                return;
        case CL_SENT_CH_RESPONSE: /* read server response */
                {
                size_t bytes_received;
                unsigned char rc4_buf[2 * RC4_KEY_LEN] = "";
-               pcd->loaded = 0;
-               t->ret = client_recv_buffer(pcd);
-               if (t->ret < 0)
+               ct->loaded = 0;
+               t->error = client_recv_buffer(ct);
+               if (t->error < 0)
                        return;
-               bytes_received = t->ret;
+               bytes_received = t->error;
                PARA_DEBUG_LOG("++++ server info ++++\n%s\n++++ end of server "
-                       "info ++++\n", pcd->buf);
+                       "info ++++\n", ct->buf);
                /* check if server has sent "Proceed" message */
-               t->ret = -E_CLIENT_AUTH;
-               if (!strstr(pcd->buf, PROCEED_MSG))
+               t->error = -E_CLIENT_AUTH;
+               if (!strstr(ct->buf, PROCEED_MSG))
                        return;
-               t->ret = 1;
-               pcd->status = CL_RECEIVED_PROCEED;
+               t->error = 0;
+               ct->status = CL_RECEIVED_PROCEED;
                if (bytes_received < PROCEED_MSG_LEN + 32)
                        return;
                PARA_INFO_LOG("decrypting session key\n");
-               t->ret = para_decrypt_buffer(pcd->key_file, rc4_buf,
-                       (unsigned char *)pcd->buf + PROCEED_MSG_LEN + 1,
+               t->error = para_decrypt_buffer(ct->key_file, rc4_buf,
+                       (unsigned char *)ct->buf + PROCEED_MSG_LEN + 1,
                        bytes_received - PROCEED_MSG_LEN - 1);
-               if (t->ret < 0)
+               if (t->error < 0)
                        return;
-               RC4_set_key(&pcd->rc4_send_key, RC4_KEY_LEN, rc4_buf);
-               RC4_set_key(&pcd->rc4_recv_key, RC4_KEY_LEN, rc4_buf + RC4_KEY_LEN);
-               enable_crypt(pcd->fd, rc4_recv, rc4_send, pcd);
+               RC4_set_key(&ct->rc4_send_key, RC4_KEY_LEN, rc4_buf);
+               RC4_set_key(&ct->rc4_recv_key, RC4_KEY_LEN, rc4_buf + RC4_KEY_LEN);
+               enable_crypt(ct->fd, rc4_recv, rc4_send, ct);
                }
        case CL_RECEIVED_PROCEED: /* concat args and send command */
                {
                int i;
                char *command = NULL;
-               for (i = 0; i < pcd->conf.inputs_num; i++) {
+               for (i = 0; i < ct->conf.inputs_num; i++) {
                        char *tmp = command;
                        command = make_message("%s\n%s", command?
-                               command : "", pcd->conf.inputs[i]);
+                               command : "", ct->conf.inputs[i]);
                        free(tmp);
                }
                command = para_strcat(command, EOC_MSG "\n");
                PARA_DEBUG_LOG("--> %s\n", command);
-               t->ret = send_buffer(pcd->fd, command);
+               t->error = send_buffer(ct->fd, command);
                free(command);
-               if (t->ret > 0)
-                       pcd->status = CL_SENT_COMMAND;
+               if (t->error > 0)
+                       ct->status = CL_SENT_COMMAND;
                return;
                }
        case CL_SENT_COMMAND:
-               pcd->loaded = 0;
-               t->ret = client_recv_buffer(pcd);
-               if (t->ret < 0)
+               ct->loaded = 0;
+               t->error = client_recv_buffer(ct);
+               if (t->error < 0)
                        return;
-               t->ret = -E_HANDSHAKE_COMPLETE;
-               if (strstr(pcd->buf, AWAITING_DATA_MSG))
-                       pcd->status = CL_SENDING;
+               if (strstr(ct->buf, AWAITING_DATA_MSG))
+                       ct->status = CL_SENDING;
                else
-                       pcd->status = CL_RECEIVING;
+                       ct->status = CL_RECEIVING;
                return;
        case CL_SENDING: /* FIXME: might block */
-               PARA_INFO_LOG("loaded: %zd\n", *pcd->in_loaded);
-               t->ret = send_bin_buffer(pcd->fd, pcd->inbuf, *pcd->in_loaded);
-               if (t->ret < 0)
+               PARA_INFO_LOG("loaded: %zd\n", *ct->in_loaded);
+               t->error = send_bin_buffer(ct->fd, ct->inbuf, *ct->in_loaded);
+               if (t->error < 0)
                        return;
-               *pcd->in_loaded = 0;
+               *ct->in_loaded = 0;
                return;
        case CL_RECEIVING:
-               t->ret = client_recv_buffer(pcd);
+               t->error = client_recv_buffer(ct);
                return;
        }
 }
 
 /* connect to para_server and register the client task */
-static int client_connect(struct private_client_data *pcd)
+static int client_connect(struct client_task *ct)
 {
        int ret;
 
-       pcd->fd = -1;
-       ret = makesock(AF_UNSPEC, IPPROTO_TCP, 0, pcd->conf.hostname_arg,
-               pcd->conf.server_port_arg);
+       ct->fd = -1;
+       ret = makesock(AF_UNSPEC, IPPROTO_TCP, 0, ct->conf.hostname_arg,
+               ct->conf.server_port_arg);
        if (ret < 0)
                return ret;
-       pcd->fd = ret;
-       pcd->status = CL_CONNECTED;
-       ret = mark_fd_nonblocking(pcd->fd);
+       ct->fd = ret;
+       ct->status = CL_CONNECTED;
+       ret = mark_fd_nonblocking(ct->fd);
        if (ret < 0)
                goto err_out;
-       pcd->task.pre_select = client_pre_select;
-       pcd->task.post_select = client_post_select;
-       pcd->task.private_data = pcd;
-       sprintf(pcd->task.status, "client");
-       register_task(&pcd->task);
+       ct->task.pre_select = client_pre_select;
+       ct->task.post_select = client_post_select;
+       sprintf(ct->task.status, "client");
+       register_task(&ct->task);
        return 1;
 err_out:
-       close(pcd->fd);
-       pcd->fd = -1;
+       close(ct->fd);
+       ct->fd = -1;
        return ret;
 }
 
@@ -321,33 +317,32 @@ err_out:
  *
  * \return Standard.
  */
-int client_open(int argc, char *argv[], struct private_client_data **pcd_ptr)
+int client_open(int argc, char *argv[], struct client_task **ct_ptr)
 {
        char *home = para_homedir();
        struct stat statbuf;
        int ret;
-       struct private_client_data *pcd =
-               para_calloc(sizeof(struct private_client_data));
+       struct client_task *ct = para_calloc(sizeof(struct client_task));
 
-       *pcd_ptr = pcd;
-       pcd->fd = -1;
-       ret = client_cmdline_parser(argc, argv, &pcd->conf);
-       HANDLE_VERSION_FLAG("client", pcd->conf);
+       *ct_ptr = ct;
+       ct->fd = -1;
+       ret = client_cmdline_parser(argc, argv, &ct->conf);
+       HANDLE_VERSION_FLAG("client", ct->conf);
        ret = -E_CLIENT_SYNTAX;
-       if (!pcd->conf.inputs_num)
+       if (!ct->conf.inputs_num)
                goto out;
-       pcd->user = pcd->conf.user_given?
-               para_strdup(pcd->conf.user_arg) : para_logname();
+       ct->user = ct->conf.user_given?
+               para_strdup(ct->conf.user_arg) : para_logname();
 
-       pcd->key_file = pcd->conf.key_file_given?
-               para_strdup(pcd->conf.key_file_arg) :
-               make_message("%s/.paraslash/key.%s", home, pcd->user);
+       ct->key_file = ct->conf.key_file_given?
+               para_strdup(ct->conf.key_file_arg) :
+               make_message("%s/.paraslash/key.%s", home, ct->user);
 
-       pcd->config_file = pcd->conf.config_file_given?
-               para_strdup(pcd->conf.config_file_arg) :
+       ct->config_file = ct->conf.config_file_given?
+               para_strdup(ct->conf.config_file_arg) :
                make_message("%s/.paraslash/client.conf", home);
-       ret = stat(pcd->config_file, &statbuf);
-       if (ret && pcd->conf.config_file_given) {
+       ret = stat(ct->config_file, &statbuf);
+       if (ret && ct->conf.config_file_given) {
                ret = -E_NO_CONFIG;
                goto out;
        }
@@ -358,22 +353,22 @@ int client_open(int argc, char *argv[], struct private_client_data **pcd_ptr)
                        .check_required = 0,
                        .check_ambiguity = 0
                };
-               client_cmdline_parser_config_file(pcd->config_file,
-                       &pcd->conf, &params);
+               client_cmdline_parser_config_file(ct->config_file,
+                       &ct->conf, &params);
        }
        ret = 1;
-       PARA_INFO_LOG("loglevel: %d\n", pcd->conf.loglevel_arg);
-       PARA_INFO_LOG("config_file: %s\n", pcd->config_file);
-       PARA_INFO_LOG("key_file: %s\n", pcd->key_file);
-       PARA_NOTICE_LOG("connecting %s:%d\n", pcd->conf.hostname_arg,
-               pcd->conf.server_port_arg);
-       ret = client_connect(pcd);
+       PARA_INFO_LOG("loglevel: %d\n", ct->conf.loglevel_arg);
+       PARA_INFO_LOG("config_file: %s\n", ct->config_file);
+       PARA_INFO_LOG("key_file: %s\n", ct->key_file);
+       PARA_NOTICE_LOG("connecting %s:%d\n", ct->conf.hostname_arg,
+               ct->conf.server_port_arg);
+       ret = client_connect(ct);
 out:
        free(home);
        if (ret < 0) {
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
-               client_close(pcd);
-               *pcd_ptr = NULL;
+               client_close(ct);
+               *ct_ptr = NULL;
        }
        return ret;
 }
index 07f6166..d0bc535 100644 (file)
@@ -102,48 +102,44 @@ static void *dccp_recv_parse_config(int argc, char **argv)
 
 static void dccp_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = t->private_data;
+       struct receiver_node *rn = container_of(t, struct receiver_node, task);
        struct private_dccp_recv_data *pdd = rn->private_data;
 
-       t->ret = 1;
+       t->error = 0;
        para_fd_set(pdd->fd, &s->rfds, &s->max_fileno);
 }
 
 static void dccp_recv_post_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = t->private_data;
+       struct receiver_node *rn = container_of(t, struct receiver_node, task);
        struct private_dccp_recv_data *pdd = rn->private_data;
 
-       if (rn->output_error && *rn->output_error) {
-               t->ret = *rn->output_error;
-               goto out;
+       if (rn->output_error && *rn->output_error < 0) {
+               t->error = *rn->output_error;
+               return;
        }
-       t->ret = 1;
-       if (!s->select_ret || !FD_ISSET(pdd->fd, &s->rfds))
-               goto out; /* nothing to do */
-       t->ret = -E_DCCP_OVERRUN;
-       if (rn->loaded >= DCCP_BUFSIZE)
-               goto out;
-       t->ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
+       if (!FD_ISSET(pdd->fd, &s->rfds))
+               return; /* nothing to do */
+       if (rn->loaded >= DCCP_BUFSIZE) {
+               t->error = -E_DCCP_OVERRUN;
+               return;
+       }
+       t->error = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
                DCCP_BUFSIZE - rn->loaded);
-       if (t->ret <= 0) {
-               if (!t->ret)
-                       t->ret = -E_RECV_EOF;
-               goto out;
+       if (t->error > 0) {
+               rn->loaded += t->error;
+               return;
        }
-       rn->loaded += t->ret;
-       return;
-out:
-       if (t->ret < 0)
-               rn->error = t->ret;
+       if (!t->error)
+               t->error = -E_RECV_EOF;
 }
 
 /**
- * the init function of the dccp receiver
+ * The init function of the dccp receiver.
  *
- * \param r pointer to the receiver struct to initialize
+ * \param r Pointer to the receiver struct to initialize.
  *
- * Initialize all function pointers of \a r
+ * Initialize all function pointers of \a r.
  */
 void dccp_recv_init(struct receiver *r)
 {
diff --git a/error.h b/error.h
index b9e19f5..6b8403d 100644 (file)
--- a/error.h
+++ b/error.h
@@ -27,6 +27,8 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define RBTREE_ERRORS
 #define RECV_ERRORS
 #define SEND_COMMON_ERRORS
+#define STDOUT_ERRORS
+
 
 extern const char **para_errlist[];
 
@@ -160,7 +162,6 @@ extern const char **para_errlist[];
        PARA_ERROR(NO_CONFIG, "config file not found"), \
        PARA_ERROR(CLIENT_AUTH, "authentication failed"), \
        PARA_ERROR(SERVER_EOF, "connection closed by para_server"), \
-       PARA_ERROR(HANDSHAKE_COMPLETE, ""), /* not really an error */ \
 
 
 #define SCHED_ERRORS \
@@ -170,13 +171,9 @@ extern const char **para_errlist[];
 
 
 #define STDIN_ERRORS \
-       PARA_ERROR(STDIN_READ, "failed to read from stdin"), \
        PARA_ERROR(STDIN_EOF, "end of file"), \
 
 
-#define STDOUT_ERRORS \
-       PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \
-
 
 #define NET_ERRORS \
        PARA_ERROR(NAME_TOO_LONG, "name too long for struct sockaddr_un"), \
@@ -210,7 +207,6 @@ extern const char **para_errlist[];
        PARA_ERROR(NO_MORE_SLOTS, "no more empty slots"), \
        PARA_ERROR(MISSING_COLON, "syntax error: missing colon"), \
        PARA_ERROR(UNSUPPORTED_AUDIO_FORMAT, "given audio format not supported"), \
-       PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \
        PARA_ERROR(NOT_PLAYING, "not playing"), \
 
 
@@ -363,7 +359,6 @@ extern const char **para_errlist[];
 
 #define WRITE_ERRORS \
        PARA_ERROR(WRITE_SYNTAX, "para_write syntax error"), \
-       PARA_ERROR(PREMATURE_END, "premature end of audio file"), \
        PARA_ERROR(NO_WAV_HEADER, "wave header not found"), \
        PARA_ERROR(WAV_HEADER_SUCCESS, "successfully read wave header"), \
        PARA_ERROR(NO_DELAY, "no initial delay"), \
index af62ba6..68c97b4 100644 (file)
--- a/filter.c
+++ b/filter.c
@@ -39,12 +39,6 @@ static struct filter_args_info conf;
 
 INIT_STDERR_LOGGING(conf.loglevel_arg);
 
-static void filter_event_handler(struct task *t)
-{
-       PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
-       unregister_task(t);
-}
-
 static void open_filters(void)
 {
        struct filter_node *fn;
@@ -66,12 +60,10 @@ static int init_filter_chain(void)
 
        fc->inbuf = sit->buf;
        fc->in_loaded = &sit->loaded;
-       fc->input_error = &sit->error;
-       fc->error = 0;
-       fc->output_error = &sot->error;
-       fc->task.private_data = fc;
+       fc->input_error = &sit->task.error;
+       fc->task.error = 0;
+       fc->output_error = &sot->task.error;
        fc->task.pre_select = filter_pre_select;
-       fc->task.event_handler = filter_event_handler;
        sprintf(fc->task.status, "filter chain");
 
        for (i = 0; i < conf.filter_given; i++) {
@@ -160,7 +152,7 @@ int main(int argc, char *argv[])
        stdout_set_defaults(sot);
        sot->buf = fc->outbuf;
        sot->loaded = fc->out_loaded;
-       sot->input_error = &fc->error;
+       sot->input_error = &fc->task.error;
 
        register_task(&sit->task);
        register_task(&fc->task);
index 693579b..5cb7aaf 100644 (file)
--- a/filter.h
+++ b/filter.h
@@ -40,8 +40,6 @@ struct filter_chain {
        size_t *in_loaded;
        /** Contains the number of bytes loaded in the output buffer. */
        size_t *out_loaded;
-       /** Non-zero if this filter wont' produce any more output. */
-       int error;
        /** Pointer to the error variable of the receiving application. */
        int *input_error;
        /** Pointer to the error variable of the writing application. */
index f0b6095..9755392 100644 (file)
@@ -109,15 +109,15 @@ static void call_callbacks(struct filter_node *fn, char *inbuf, size_t inlen,
  */
 void filter_pre_select(__a_unused struct sched *s, struct task *t)
 {
-       struct filter_chain *fc = t->private_data;
+       struct filter_chain *fc = container_of(t, struct filter_chain, task);
        struct filter_node *fn;
        char *ib;
        size_t *loaded;
        int conv, conv_total = 0;
 
-       if (fc->output_error && *fc->output_error) {
-               t->ret =  *fc->output_error;
-               goto err_out;
+       if (fc->output_error && *fc->output_error < 0) {
+               t->error =  *fc->output_error;
+               return;
        }
 again:
        ib = fc->inbuf;
@@ -128,10 +128,10 @@ again:
                        size_t size, old_fn_loaded = fn->loaded;
 //                     PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n",
 //                             fc, *loaded, fn->filter->name);
-                       t->ret = fn->filter->convert(ib, *loaded, fn);
-                       if (t->ret < 0)
-                               goto err_out;
-                       size = t->ret;
+                       t->error = fn->filter->convert(ib, *loaded, fn);
+                       if (t->error < 0)
+                               return;
+                       size = t->error;
                        call_callbacks(fn, ib, size, fn->buf + old_fn_loaded,
                                fn->loaded - old_fn_loaded);
                        *loaded -= size;
@@ -153,16 +153,13 @@ again:
 //             fc->eof, *fc->out_loaded, conv, conv_total);
        if (conv)
                goto again;
-       t->ret = 1;
        if (!*fc->input_error)
                return;
        if (*fc->out_loaded)
                return;
        if (*fc->in_loaded && conv_total)
                return;
-       t->ret = -E_FC_EOF;
-err_out:
-       fc->error = t->ret;
+       t->error = -E_FC_EOF;
 }
 
 /**
index a3167bd..2e518b2 100644 (file)
@@ -84,10 +84,10 @@ static char *make_request_msg(void)
 
 static void http_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = t->private_data;
+       struct receiver_node *rn = container_of(t, struct receiver_node, task);
        struct private_http_recv_data *phd = rn->private_data;
 
-       t->ret = 1;
+       t->error = 0;
        if  (phd->status == HTTP_CONNECTED)
                para_fd_set(phd->fd, &s->wfds, &s->max_fileno);
        else
@@ -97,53 +97,47 @@ static void http_recv_pre_select(struct sched *s, struct task *t)
 
 static void http_recv_post_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = t->private_data;
+       struct receiver_node *rn = container_of(t, struct receiver_node, task);
        struct private_http_recv_data *phd = rn->private_data;
 
-       if (rn->output_error && *rn->output_error) {
-               t->ret = *rn->output_error;
-               goto out;
+       if (rn->output_error && *rn->output_error < 0) {
+               t->error = *rn->output_error;
+               return;
        }
-       t->ret = 1;
-       if (!s->select_ret)
-               goto out;
        if  (phd->status == HTTP_CONNECTED) {
                char *rq;
                if (!FD_ISSET(phd->fd, &s->wfds))
-                       goto out;
+                       return;
                rq = make_request_msg();
                PARA_INFO_LOG("sending http request\n");
-               t->ret = send_va_buffer(phd->fd, "%s", rq);
+               t->error = send_va_buffer(phd->fd, "%s", rq);
                free(rq);
-               if (t->ret > 0)
+               if (t->error >= 0)
                        phd->status = HTTP_SENT_GET_REQUEST;
-               goto out;
+               return;
        }
        if (!FD_ISSET(phd->fd, &s->rfds))
-               goto out;
+               return;
        if (phd->status == HTTP_SENT_GET_REQUEST) {
-               t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
-               if (t->ret < 0)
-                       goto out;
-               PARA_INFO_LOG("received ok msg, streaming\n");
-               t->ret = 1;
-               phd->status = HTTP_STREAMING;
-               goto out;
+               t->error = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
+               if (t->error >= 0) {
+                       PARA_INFO_LOG("received ok msg, streaming\n");
+                       phd->status = HTTP_STREAMING;
+               }
+               return;
        }
-       t->ret = -E_HTTP_RECV_OVERRUN;
-       if (rn->loaded >= BUFSIZE)
-               goto out;
-       t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
+       if (rn->loaded >= BUFSIZE) {
+               t->error = -E_HTTP_RECV_OVERRUN;
+               return;
+       }
+       t->error = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
                BUFSIZE - rn->loaded);
-       if (t->ret <= 0) {
-               if (!t->ret)
-                       t->ret = -E_RECV_EOF;
-               goto out;
+       if (t->error > 0) {
+               rn->loaded += t->error;
+               return;
        }
-       rn->loaded += t->ret;
-out:
-       if (t->ret < 0)
-               rn->error = t->ret;
+       if (!t->error)
+               t->error = -E_RECV_EOF;
 }
 
 static void http_recv_close(struct receiver_node *rn)
index 83f832a..164c1cb 100644 (file)
@@ -85,7 +85,7 @@ static int msg_to_buf(mblk_t *mp, char *buffer, int len)
 
 static void ortp_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = t->private_data;
+       struct receiver_node *rn = container_of(t, struct receiver_node, task);
        struct private_ortp_recv_data *pord = rn->private_data;
        struct timeval tmp;
 
@@ -95,7 +95,6 @@ static void ortp_recv_pre_select(struct sched *s, struct task *t)
        }
        if (tv_diff(&s->timeout, &tmp, NULL) > 0)
                s->timeout = tmp;
-       t->ret = 1;
 }
 
 static void compute_next_chunk(unsigned chunk_time,
@@ -114,7 +113,7 @@ static void compute_next_chunk(unsigned chunk_time,
 
 static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = t->private_data;
+       struct receiver_node *rn = container_of(t, struct receiver_node, task);
        struct private_ortp_recv_data *pord = rn->private_data;
        mblk_t *mp;
        int packet_type, stream_type;
@@ -123,12 +122,10 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
        size_t packet_size;
 
 //     PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
-       if (rn->output_error && *rn->output_error) {
-               rn->error = *rn->output_error;
-               t->ret = rn->error;
+       if (rn->output_error && *rn->output_error < 0) {
+               t->error = *rn->output_error;
                return;
        }
-       t->ret = 1;
        if (pord->start.tv_sec)
                if (tv_diff(now, &pord->next_chunk, NULL) < 0)
                        return;
@@ -138,26 +135,25 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
 //             PARA_INFO_LOG("nope, chunk_ts = %d, loaded: %d, bad: %d\n",
 //                     pord->timestamp, rn->loaded, pord->c_bad);
                pord->c_bad++;
-               t->ret = -E_TOO_MANY_BAD_CHUNKS;
-               if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000)
+               if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000) {
+                       t->error = -E_TOO_MANY_BAD_CHUNKS;
                        return;
-               t->ret = 1;
+               }
                tv_add(now, &min_delay, &pord->next_chunk);
                return;
        }
        /* okay, we have a chunk of data */
        if (!pord->start.tv_sec)
                pord->start = *now;
-       t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
-       if (t->ret < ORTP_AUDIO_HEADER_LEN) {
-               if (t->ret < 0)
-                       t->ret = -E_MSG_TO_BUF;
+       t->error = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
+       if (t->error < ORTP_AUDIO_HEADER_LEN) {
+               if (t->error < 0)
+                       t->error = -E_MSG_TO_BUF;
                else
-                       t->ret = -E_ORTP_RECV_EOF;
-               rn->error = t->ret;
+                       t->error = -E_ORTP_RECV_EOF;
                goto err_out;
        }
-       packet_size = t->ret;
+       packet_size = t->error;
        packet_type = READ_PACKET_TYPE(tmpbuf);
        stream_type = READ_STREAM_TYPE(tmpbuf);
        chunk_time = READ_CHUNK_TIME(tmpbuf);
@@ -168,8 +164,7 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
        switch (packet_type) {
        unsigned header_len, payload_len;
        case ORTP_EOF:
-               t->ret = -E_RECV_EOF;
-               rn->error = t->ret;
+               t->error = -E_RECV_EOF;
                goto err_out;
        case ORTP_BOF:
                PARA_INFO_LOG("bof (%zu)\n", packet_size);
@@ -179,9 +174,10 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
                if (!pord->have_header && stream_type)
                /* can't use the data, wait for header */
                        goto success;
-               t->ret = -E_OVERRUN;
-               if (packet_size + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN)
+               if (packet_size + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
+                       t->error = -E_OVERRUN;
                        goto err_out;
+               }
                if (packet_size > ORTP_AUDIO_HEADER_LEN) {
                        memcpy(rn->buf + rn->loaded, tmpbuf + ORTP_AUDIO_HEADER_LEN,
                                packet_size - ORTP_AUDIO_HEADER_LEN);
@@ -199,21 +195,22 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
                        rn->loaded = packet_size - ORTP_AUDIO_HEADER_LEN;
                        goto success;
                }
-               t->ret = -E_INVALID_HEADER;
-               if (header_len + ORTP_AUDIO_HEADER_LEN > packet_size)
+               if (header_len + ORTP_AUDIO_HEADER_LEN > packet_size) {
+                       t->error = -E_INVALID_HEADER;
                        goto err_out;
+               }
                payload_len = packet_size - ORTP_AUDIO_HEADER_LEN - header_len;
-               t->ret = -E_OVERRUN;
-               if (rn->loaded + payload_len > CHUNK_SIZE)
+               if (rn->loaded + payload_len > CHUNK_SIZE) {
+                       t->error = -E_OVERRUN;
                        goto err_out;
+               }
                if (payload_len)
                        memcpy(rn->buf + rn->loaded, tmpbuf
                                + (packet_size - payload_len), payload_len);
                rn->loaded += payload_len;
-               goto success;
        }
 success:
-       t->ret = 1;
+       t->error = 0;
        freemsg(mp);
        if (pord->c_bad) {
                pord->c_bad = 0;
@@ -222,7 +219,6 @@ success:
        compute_next_chunk(chunk_time, pord);
        return;
 err_out:
-       rn->error = t->ret;
        freemsg(mp);
 }
 
diff --git a/recv.c b/recv.c
index 55e97f1..f9e5b93 100644 (file)
--- a/recv.c
+++ b/recv.c
@@ -45,14 +45,6 @@ static void *parse_config(int argc, char *argv[], int *receiver_num)
        return check_receiver_arg(conf.receiver_arg, receiver_num);
 }
 
-static void rn_event_handler(struct task *t)
-{
-       struct receiver_node *rn = t->private_data;
-       PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
-       rn->error = t->ret;
-       unregister_task(t);
-}
-
 /**
  * the main function of para_recv
  *
@@ -94,13 +86,11 @@ int main(int argc, char *argv[])
        stdout_set_defaults(&sot);
        sot.buf = rn.buf;
        sot.loaded = &rn.loaded;
-       sot.input_error = &rn.error;
+       sot.input_error = &rn.task.error;
        register_task(&sot.task);
 
-       rn.task.private_data = &rn;
        rn.task.pre_select = r->pre_select;
        rn.task.post_select = r->post_select;
-       rn.task.event_handler = rn_event_handler;
        sprintf(rn.task.status, "receiver node");
        register_task(&rn.task);
 
diff --git a/recv.h b/recv.h
index ab9bc49..d52ceba 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -18,8 +18,6 @@ struct receiver_node {
        size_t loaded;
        /** receiver-specific data */
        void *private_data;
-       /** Set to non-zero error value on errors or on end of file. */
-       int error;
        /** Pointer to the error member of the consumer. */
        int *output_error;
        /** pointer to the configuration data for this instance */
diff --git a/sched.c b/sched.c
index 873fb8e..9976a11 100644 (file)
--- a/sched.c
+++ b/sched.c
@@ -24,19 +24,35 @@ static int initialized;
 static struct timeval now_struct;
 struct timeval *now = &now_struct;
 
+/**
+ * Remove a task from the scheduler.
+ *
+ * \param t the task to remove
+ *
+ * If the pre_select pointer of \a t is not \p NULL, it is removed from
+ * the pre_select list of the scheduler. Same goes for \a post_select.
+ */
+void unregister_task(struct task *t)
+{
+       if (!initialized)
+               return;
+       PARA_INFO_LOG("unregistering %s (%p)\n", t->status, t);
+       if (t->pre_select)
+               list_del(&t->pre_select_node);
+       if (t->post_select)
+               list_del(&t->post_select_node);
+};
+
+
 static void sched_preselect(struct sched *s)
 {
        struct task *t, *tmp;
-again:
        list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) {
                t->pre_select(s, t);
 //             PARA_INFO_LOG("%s \n", t->status);
-               if (t->ret > 0)
+               if (t->error >= 0)
                        continue;
-               if (!t->event_handler)
-                       continue;
-               t->event_handler(t);
-               goto again;
+               unregister_task(t);
        }
 }
 
@@ -47,9 +63,9 @@ static void sched_post_select(struct sched *s)
        list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
                t->post_select(s, t);
 //             PARA_INFO_LOG("%s: %d\n", t->status, t->ret);
-               if (t->ret > 0 || !t->event_handler)
+               if (t->error >= 0)
                        continue;
-               t->event_handler(t);
+               unregister_task(t);
        }
 }
 
@@ -70,6 +86,8 @@ static void sched_post_select(struct sched *s)
  */
 int schedule(struct sched *s)
 {
+       int ret;
+
        if (!initialized)
                return -E_NOT_INITIALIZED;
        gettimeofday(now, NULL);
@@ -79,10 +97,9 @@ again:
        s->timeout = s->default_timeout;
        s->max_fileno = -1;
        sched_preselect(s);
-       s->select_ret = para_select(s->max_fileno + 1, &s->rfds,
-               &s->wfds, &s->timeout);
-       if (s->select_ret < 0)
-               return s->select_ret;
+       ret = para_select(s->max_fileno + 1, &s->rfds, &s->wfds, &s->timeout);
+       if (ret < 0)
+               return ret;
        gettimeofday(now, NULL);
        sched_post_select(s);
        if (list_empty(&pre_select_list) && list_empty(&post_select_list))
@@ -118,33 +135,14 @@ void register_task(struct task *t)
        PARA_INFO_LOG("registering %s (%p)\n", t->status, t);
        if (t->pre_select) {
                PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select);
-               para_list_add(&t->pre_select_node, &pre_select_list);
+               list_add_tail(&t->pre_select_node, &pre_select_list);
        }
        if (t->post_select) {
                PARA_DEBUG_LOG("post_select: %p\n", &t->pre_select);
-               para_list_add(&t->post_select_node, &post_select_list);
+               list_add_tail(&t->post_select_node, &post_select_list);
        }
 }
 
-/**
- * remove a task from the scheduler
- *
- * \param t the task to remove
- *
- * If the pre_select pointer of \a t is not \p NULL, it is removed from
- * the pre_select list of the scheduler. Same goes for \a post_select.
- */
-void unregister_task(struct task *t)
-{
-       if (!initialized)
-               return;
-       PARA_INFO_LOG("unregistering %s (%p)\n", t->status, t);
-       if (t->pre_select)
-               list_del(&t->pre_select_node);
-       if (t->post_select)
-               list_del(&t->post_select_node);
-};
-
 /**
  * unregister all tasks
  *
@@ -204,8 +202,8 @@ char *get_task_list(void)
  *
  * \param id the task identifier
  *
- * Find the task identified by \a id, set the tasks' return value to
- * \p -E_TASK_KILLED and call the event handler of the task.
+ * Find the task identified by \a id, set the tasks' error value to
+ * \p -E_TASK_KILLED and unregister the task.
  *
  * \return Positive on success, negative on errors (e.g. if \a id does not
  * correspond to a registered task).
@@ -221,18 +219,16 @@ int kill_task(char *id)
                sprintf(buf, "%p", t);
                if (strcmp(id, buf))
                        continue;
-               t->ret = -E_TASK_KILLED;
-               if (t->event_handler)
-                       t->event_handler(t);
+               t->error = -E_TASK_KILLED;
+               unregister_task(t);
                return 1;
        }
        list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
                sprintf(buf, "%p", t);
                if (strcmp(id, buf))
                        continue;
-               t->ret = -E_TASK_KILLED;
-               if (t->event_handler)
-                       t->event_handler(t);
+               t->error = -E_TASK_KILLED;
+               unregister_task(t);
                return 1;
        }
        return -E_NO_SUCH_TASK;
diff --git a/sched.h b/sched.h
index 7ab8308..36d0769 100644 (file)
--- a/sched.h
+++ b/sched.h
@@ -27,8 +27,6 @@ struct sched {
        fd_set wfds;
        /** highest numbered file descriptor in any of the above fd sets */
        int max_fileno;
-       /** the return value of the previous select call */
-       int select_ret;
 };
 
 /**
@@ -46,8 +44,6 @@ struct sched {
  * \sa struct sched
  */
 struct task {
-       /** pointer to the struct this task is embedded in */
-       void *private_data;
        /**
         * the pre select hook of \a t
         *
@@ -61,10 +57,8 @@ struct task {
         * evaluate and act upon the results of the previous select call.
         */
        void (*post_select)(struct sched *s, struct task *t);
-       /** gets called if pre_select or post_select returned an error */
-       void (*event_handler)(struct task *t);
-       /** pre_select() and post_select store their return value here */
-       int ret;
+       /** Whether this task is in error state. */
+       int error;
        /** position of the task in the pre_select list of the scheduler */
        struct list_head pre_select_node;
        /** position of the task in the post_select list of the scheduler */
diff --git a/stdin.c b/stdin.c
index 4e56700..438883a 100644 (file)
--- a/stdin.c
+++ b/stdin.c
@@ -30,8 +30,8 @@
  */
 static void stdin_pre_select(struct sched *s, struct task *t)
 {
-       struct stdin_task *sit = t->private_data;
-       t->ret = 1;
+       struct stdin_task *sit = container_of(t, struct stdin_task, task);
+       t->error = 0;
        sit->check_fd = 0;
        if (sit->loaded >= sit->bufsize)
                return;
@@ -39,12 +39,6 @@ static void stdin_pre_select(struct sched *s, struct task *t)
        para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
 }
 
-static void stdin_default_event_handler(struct task *t)
-{
-       PARA_NOTICE_LOG("%p: %s\n", t, para_strerror(-t->ret));
-       unregister_task(t);
-}
-
 /**
  * The post select function of the stdin task.
  *
@@ -58,24 +52,21 @@ static void stdin_default_event_handler(struct task *t)
  */
 static void stdin_post_select(struct sched *s, struct task *t)
 {
-       struct stdin_task *sit = t->private_data;
+       struct stdin_task *sit = container_of(t, struct stdin_task, task);
        ssize_t ret;
 
-       t->ret = 1;
+       t->error = 0;
        if (!sit->check_fd)
                return;
        if (!FD_ISSET(STDIN_FILENO, &s->rfds))
                return;
        ret = read(STDIN_FILENO, sit->buf + sit->loaded, sit->bufsize - sit->loaded);
        if (ret < 0)
-               t->ret = -E_STDIN_READ;
-       else if (ret > 0) {
+               t->error = ERRNO_TO_PARA_ERROR(errno);
+       else if (ret > 0)
                sit->loaded += ret;
-               t->ret = ret;
-       } else
-               t->ret = -E_STDIN_EOF;
-       if (t->ret < 0)
-               sit->error = t->ret;
+       else
+               t->error = -E_STDIN_EOF;
 }
 
 /**
@@ -94,11 +85,8 @@ void stdin_set_defaults(struct stdin_task *sit)
 
        sit->bufsize = 16 * 1024,
        sit->loaded = 0,
-       sit->error = 0,
        sit->task.pre_select = stdin_pre_select;
        sit->task.post_select = stdin_post_select;
-       sit->task.event_handler = stdin_default_event_handler;
-       sit->task.private_data = sit;
        sprintf(sit->task.status, "stdin reader");
        ret = mark_fd_nonblocking(STDIN_FILENO);
        if (ret >= 0)
diff --git a/stdin.h b/stdin.h
index 6a2b1d6..63d00d5 100644 (file)
--- a/stdin.h
+++ b/stdin.h
@@ -18,8 +18,6 @@ struct stdin_task {
        int check_fd;
        /** The task structure. */
        struct task task;
-       /** Non-zero on read error, or if a read from stdin returned zero. */
-       int error;
 };
 
 void stdin_set_defaults(struct stdin_task *sit);
index a192f2b..137030e 100644 (file)
--- a/stdout.c
+++ b/stdout.c
  */
 static void stdout_pre_select(struct sched *s, struct task *t)
 {
-       struct stdout_task *sot = t->private_data;
+       struct stdout_task *sot = container_of(t, struct stdout_task, task);
 
-       t->ret = 1;
+       t->error = 0;
        sot->check_fd = 0;
        if (!*sot->loaded) {
-               if (*sot->input_error) {
-                       t->ret = *sot->input_error;
+               if (*sot->input_error < 0) {
+                       t->error = *sot->input_error;
                        s->timeout.tv_sec = 0;
                        s->timeout.tv_usec = 1;
                }
@@ -57,31 +57,25 @@ static void stdout_pre_select(struct sched *s, struct task *t)
  */
 static void stdout_post_select(struct sched *s, struct task *t)
 {
-       struct stdout_task *sot = t->private_data;
+       struct stdout_task *sot = container_of(t, struct stdout_task, task);
        ssize_t ret;
 
-       t->ret = 1;
+       t->error = 0;
        if (!sot->check_fd) {
                if (*sot->input_error)
-                       t->ret = *sot->input_error;
+                       t->error = *sot->input_error;
                return;
        }
        if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
                return;
-       t->ret = -E_STDOUT_WRITE;
        ret = write(STDOUT_FILENO, sot->buf, *sot->loaded);
-       if (ret <= 0)
+       if (ret < 0) {
+               t->error = -ERRNO_TO_PARA_ERROR(errno);
                return;
+       }
        *sot->loaded -= ret;
        if (*sot->loaded)
                memmove(sot->buf, sot->buf + ret, *sot->loaded);
-       t->ret = 1;
-}
-
-static void stdout_default_event_handler(struct task *t)
-{
-       PARA_NOTICE_LOG("%p: %s\n", t, para_strerror(-t->ret));
-       unregister_task(t);
 }
 
 /**
@@ -97,11 +91,8 @@ void stdout_set_defaults(struct stdout_task *sot)
 {
        int ret;
 
-       sot->task.private_data = sot;
        sot->task.pre_select = stdout_pre_select;
        sot->task.post_select = stdout_post_select;
-       sot->task.event_handler = stdout_default_event_handler;
-       sot->error = 0;
        sprintf(sot->task.status, "stdout writer");
        ret = mark_fd_nonblocking(STDOUT_FILENO);
        if (ret >= 0)
index 652c7e9..6a6400c 100644 (file)
--- a/stdout.h
+++ b/stdout.h
@@ -16,8 +16,6 @@ struct stdout_task {
        size_t *loaded;
        /** Pointer to the error variable of the feeding task. */
        int *input_error;
-       /** Non-zero if a write error occurred. */
-       int error;
        /** The task structure. */
        struct task task;
        /** Whether \p STDOUT_FILENO was included in the write fd set. */
diff --git a/write.c b/write.c
index 672f3e2..c692dc9 100644 (file)
--- a/write.c
+++ b/write.c
@@ -29,12 +29,11 @@ struct check_wav_task {
        /** Number of bytes loaded in \a buf. */
        size_t *loaded;
        /** Non-zero if an error occurred or end of file was reached. */
-       int *error;
+       int *input_error;
        /** Number of channels specified in wav header given by \a buf. */
        unsigned channels;
        /** Sample rate specified in wav header given by \a buf. */
        unsigned samplerate;
-       /** The task structure for this task. */
        struct task task;
 };
 
@@ -47,9 +46,12 @@ struct initial_delay_task {
 };
 
 static struct write_args_info conf;
+
 static struct stdin_task sit;
-static struct check_wav_task cwt;
-static struct initial_delay_task idt;
+
+static struct check_wav_task the_check_wav_task;
+static struct initial_delay_task the_initial_delay_task;
+
 static struct writer_node_group *wng;
 
 /** Length of a standard wav header. */
@@ -63,39 +65,52 @@ static struct writer_node_group *wng;
  */
 static void check_wav_pre_select(__a_unused struct sched *s, struct task *t)
 {
-       struct check_wav_task *wt = t->private_data;
+       struct check_wav_task *cwt = container_of(t, struct check_wav_task, task);
        unsigned char *a;
+       int ret;
 
-       if (*wt->loaded < WAV_HEADER_LEN) {
-               t->ret = *wt->error? -E_PREMATURE_END : 1;
+       if (*cwt->loaded < WAV_HEADER_LEN) {
+               if (*cwt->input_error < 0)
+                       t->error = *cwt->input_error;
                return;
        }
-       wt->channels = 2;
-       wt->samplerate = 44100;
-       a = (unsigned char*)wt->buf;
-       t->ret = -E_NO_WAV_HEADER;
-       if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
-               return;
-       wt->channels = (unsigned) a[22];
-       wt->samplerate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
-       *wt->loaded -= WAV_HEADER_LEN;
-       memmove(wt->buf, wt->buf + WAV_HEADER_LEN, *wt->loaded);
-       t->ret = -E_WAV_HEADER_SUCCESS;
-       PARA_INFO_LOG("channels: %d, sample rate: %d\n", wt->channels, wt->samplerate);
+       cwt->channels = 2;
+       cwt->samplerate = 44100;
+       a = (unsigned char*)cwt->buf;
+       if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F') {
+               PARA_NOTICE_LOG("wav header not found\n");
+               t->error = -E_NO_WAV_HEADER;
+               goto out;
+       }
+       cwt->channels = (unsigned) a[22];
+       cwt->samplerate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
+       *cwt->loaded -= WAV_HEADER_LEN;
+       memmove(cwt->buf, cwt->buf + WAV_HEADER_LEN, *cwt->loaded);
+       t->error = -E_WAV_HEADER_SUCCESS;
+       PARA_INFO_LOG("channels: %d, sample rate: %d\n", cwt->channels, cwt->samplerate);
+out:
+       wng->channels = &cwt->channels;
+       wng->samplerate = &cwt->samplerate;
+       ret = wng_open(wng);
+       if (ret < 0)
+               t->error = ret;
+       s->timeout.tv_sec = 0;
+       s->timeout.tv_usec = 1;
 }
 
 static void initial_delay_pre_select(struct sched *s, struct task *t)
 {
-       struct initial_delay_task *dt = t->private_data;
+       struct initial_delay_task *idt = container_of(t, struct initial_delay_task, task);
        struct timeval diff;
 
-       t->ret = -E_NO_DELAY;
-       if (!dt->start_time.tv_sec && !dt->start_time.tv_usec)
+       if (!idt->start_time.tv_sec && !idt->start_time.tv_usec) {
+               t->error = -E_NO_DELAY;
                return;
-       t->ret = -E_DELAY_TIMEOUT;
-       if (tv_diff(now, &dt->start_time, &diff) > 0)
+       }
+       if (tv_diff(now, &idt->start_time, &diff) > 0) {
+               t->error = -E_DELAY_TIMEOUT;
                return;
-       t->ret = 1;
+       }
        if (tv_diff(&s->timeout , &diff, NULL) > 0)
                s->timeout = diff;
 }
@@ -106,6 +121,7 @@ static struct writer_node_group *check_args(void)
 {
        int i, ret = -E_WRITE_SYNTAX;
        struct writer_node_group *g = NULL;
+       struct initial_delay_task *idt = &the_initial_delay_task;
 
        if (conf.list_writers_given) {
                char *msg = NULL;
@@ -126,8 +142,8 @@ static struct writer_node_group *check_args(void)
                if (sscanf(conf.start_time_arg, "%lu:%lu",
                                &sec, &usec) != 2)
                        goto out;
-               idt.start_time.tv_sec = sec;
-               idt.start_time.tv_usec = usec;
+               idt->start_time.tv_sec = sec;
+               idt->start_time.tv_usec = usec;
        }
        if (!conf.writer_given) {
                g = setup_default_wng();
@@ -152,54 +168,6 @@ out:
        return NULL;
 }
 
-static void wng_event_handler(struct task *t)
-{
-       struct writer_node_group *g = t->private_data;
-
-       PARA_INFO_LOG("%s\n", para_strerror(-t->ret));
-       unregister_task(t);
-       wng_close(g);
-}
-
-
-static void idt_event_handler(struct task *t)
-{
-       int ret;
-
-       PARA_INFO_LOG("%s\n", para_strerror(-t->ret));
-       unregister_task(t);
-       wng->buf = sit.buf;
-       wng->loaded = &sit.loaded;
-       wng->input_error = &sit.error;
-       wng->task.event_handler = wng_event_handler;
-       wng->channels = &cwt.channels;
-       wng->samplerate = &cwt.samplerate;
-       ret = wng_open(wng);
-       if (ret < 0) {
-               PARA_ERROR_LOG("%s\n", para_strerror(-ret));
-               exit(EXIT_FAILURE);
-       }
-}
-
-static void cwt_event_handler(struct task *t)
-{
-       if (t->ret != -E_NO_WAV_HEADER && t->ret != -E_WAV_HEADER_SUCCESS) {
-               PARA_ERROR_LOG("%s\n", para_strerror(-t->ret));
-               exit(EXIT_FAILURE);
-       }
-       PARA_INFO_LOG("%s\n", para_strerror(-t->ret));
-       unregister_task(t);
-//     if (t->ret == -E_WAV_HEADER_SUCCESS) {
-//             conf.channels_arg = cwt.channels;
-//             conf.sample_rate_arg = cwt.sample_rate;
-//     }
-       idt.task.pre_select = initial_delay_pre_select;
-       idt.task.private_data = &idt;
-       idt.task.event_handler = idt_event_handler;
-       sprintf(idt.task.status, "initial_delay");
-       register_task(&idt.task);
-}
-
 /**
  * Para_write's main function.
  *
@@ -215,6 +183,8 @@ int main(int argc, char *argv[])
 {
        int ret = -E_WRITE_SYNTAX;
        struct sched s;
+       struct check_wav_task *cwt = &the_check_wav_task;
+       struct initial_delay_task *idt = &the_initial_delay_task;
 
        write_cmdline_parser(argc, argv, &conf);
        HANDLE_VERSION_FLAG("write", conf);
@@ -226,27 +196,33 @@ int main(int argc, char *argv[])
        stdin_set_defaults(&sit);
        if (conf.bufsize_given)
                sit.bufsize = conf.bufsize_arg;
-       sit.buf = para_malloc(sit.bufsize),
+       sit.buf = para_malloc(sit.bufsize);
+
+       wng->buf = sit.buf;
+       wng->loaded = &sit.loaded;
+       wng->input_error = &sit.task.error;
+
        register_task(&sit.task);
 
-       cwt.task.pre_select = check_wav_pre_select;
-       cwt.task.private_data = &cwt;
-       cwt.task.event_handler = cwt_event_handler;
-       cwt.buf = sit.buf;
-       cwt.loaded = &sit.loaded;
-       cwt.error = &sit.error;
-       sprintf(cwt.task.status, "check wav");
-       register_task(&cwt.task);
+       cwt->buf = sit.buf;
+       cwt->loaded = &sit.loaded;
+       cwt->input_error = &sit.task.error;
+       sprintf(cwt->task.status, "check wav");
+       cwt->task.pre_select = check_wav_pre_select;
+       register_task(&cwt->task);
+
+       idt->task.pre_select = initial_delay_pre_select;
+       sprintf(idt->task.status, "initial_delay");
+       register_task(&idt->task);
 
-       s.default_timeout.tv_sec = 1;
+       s.default_timeout.tv_sec = 10;
        s.default_timeout.tv_usec = 0;
        ret = schedule(&s);
-
+       wng_close(wng);
 out:
        if (ret < 0) {
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
-               ret = EXIT_FAILURE;
-       } else
-               ret = EXIT_SUCCESS;
-       return ret;
+               exit(EXIT_FAILURE);
+       }
+       exit(EXIT_SUCCESS);
 }
diff --git a/write.h b/write.h
index 1a3f193..a73ada5 100644 (file)
--- a/write.h
+++ b/write.h
@@ -103,8 +103,6 @@ struct writer_node_group {
        size_t max_chunk_bytes;
        /** Non-zero if an error or end of file was encountered by the feeding task. */
        int *input_error;
-       /** Non-zero if an error occurred or end of file was encountered. */
-       int error;
        /** current output buffer */
        char *buf;
        /** number of bytes loaded in the output buffer */
index 29a5e53..f9560d7 100644 (file)
@@ -21,49 +21,43 @@ struct writer writers[NUM_SUPPORTED_WRITERS] = {WRITER_ARRAY};
 
 static void wng_pre_select(__a_unused struct sched *s, struct task *t)
 {
-       struct writer_node_group *g = t->private_data;
+       struct writer_node_group *g = container_of(t, struct writer_node_group, task);
        int i;
 
        FOR_EACH_WRITER_NODE(i, g) {
                struct writer_node *wn = &g->writer_nodes[i];
-               t->ret = wn->writer->pre_select(s, wn);
-               if (t->ret < 0) {
-                       g->error = t->ret;
+               t->error = wn->writer->pre_select(s, wn);
+               if (t->error < 0)
                        return;
-               }
        }
 }
 
 static void wng_post_select(struct sched *s, struct task *t)
 {
-       struct writer_node_group *g = t->private_data;
+       struct writer_node_group *g = container_of(t, struct writer_node_group, task);
        int i;
        size_t min_written = 0;
 
        FOR_EACH_WRITER_NODE(i, g) {
                struct writer_node *wn = &g->writer_nodes[i];
-               t->ret = wn->writer->post_select(s, wn);
-               if (t->ret < 0) {
-                       g->error = t->ret;
+               t->error = wn->writer->post_select(s, wn);
+               if (t->error < 0)
                        return;
-               }
                if (!i)
                        min_written = wn->written;
                else
                        min_written = PARA_MIN(min_written, wn->written);
        }
-//     PARA_INFO_LOG("loaded: %zd, min_written: %zd bytes\n", *g->loaded, min_written);
+       //PARA_INFO_LOG("loaded: %zd, min_written: %zd bytes\n", *g->loaded, min_written);
        if (min_written) {
                *g->loaded -= min_written;
                FOR_EACH_WRITER_NODE(i, g)
                        g->writer_nodes[i].written -= min_written;
        }
        if (!*g->loaded && *g->input_error) {
-               g->error = *g->input_error;
-               t->ret = g->error;
+               t->error = *g->input_error;
                return;
        }
-       t->ret = 1;
        if (*g->loaded && min_written) {
 //             PARA_INFO_LOG("moving %zd bytes\n", *g->loaded);
                memmove(g->buf, g->buf + min_written, *g->loaded);
@@ -96,7 +90,6 @@ int wng_open(struct writer_node_group *g)
                g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
        }
        sprintf(g->task.status, "%s", "writer node group");
-       g->error = 0;
        return 1;
 err_out:
        PARA_ERROR_LOG("%s\n", para_strerror(-ret));
@@ -106,7 +99,6 @@ err_out:
                wn->writer->close(wn);
        }
        g->num_writers = 0;
-       g->error = ret;
        return ret;
 }
 
@@ -156,7 +148,6 @@ struct writer_node_group *wng_new(unsigned num_writers)
        g->num_writers = num_writers;
        g->writer_nodes = para_calloc(num_writers
                * sizeof(struct writer_node));
-       g->task.private_data = g;
        g->task.post_select = wng_post_select;
        g->task.pre_select = wng_pre_select;
        return g;