static void signal_pre_select(struct sched *s, struct task *t)
{
- struct signal_task *st = t->private_data;
- t->ret = 1;
+ struct signal_task *st = container_of(t, struct signal_task, task);
para_fd_set(st->fd, &s->rfds, &s->max_fileno);
}
static void signal_post_select(struct sched *s, struct task *t)
{
- struct signal_task *st = t->private_data;
- t->ret = -E_AFS_PARENT_DIED;
- if (getppid() == 1)
- goto err;
- t->ret = 1;
+ struct signal_task *st = container_of(t, struct signal_task, task);
+ if (getppid() == 1) {
+ t->error = -E_AFS_PARENT_DIED;
+ return;
+ }
if (!FD_ISSET(st->fd, &s->rfds))
return;
st->signum = para_next_signal();
- t->ret = 1;
if (st->signum == SIGHUP) {
close_afs_tables();
- t->ret = open_afs_tables();
- if (t->ret < 0)
- goto err;
+ t->error = open_afs_tables();
+ if (t->error < 0)
+ return;
init_admissible_files(current_mop);
return;
}
- t->ret = -E_AFS_SIGNAL;
-err:
- PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
- unregister_tasks();
+ t->error = -E_AFS_SIGNAL;
}
static void register_signal_task(void)
st->task.pre_select = signal_pre_select;
st->task.post_select = signal_post_select;
- st->task.private_data = st;
sprintf(st->task.status, "signal task");
register_task(&st->task);
}
static void command_pre_select(struct sched *s, struct task *t)
{
- struct command_task *ct = t->private_data;
+ struct command_task *ct = container_of(t, struct command_task, task);
struct afs_client *client;
para_fd_set(server_socket, &s->rfds, &s->max_fileno);
para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
list_for_each_entry(client, &afs_client_list, node)
para_fd_set(client->fd, &s->rfds, &s->max_fileno);
- t->ret = 1;
}
/**
static void command_post_select(struct sched *s, struct task *t)
{
- struct command_task *ct = t->private_data;
+ struct command_task *ct = container_of(t, struct command_task, task);
struct sockaddr_un unix_addr;
struct afs_client *client, *tmp;
- int fd;
+ int fd, ret;
if (FD_ISSET(server_socket, &s->rfds))
execute_server_command();
}
/* Accept connections on the local socket. */
if (!FD_ISSET(ct->fd, &s->rfds))
- goto out;
- t->ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr));
- if (t->ret < 0) {
- PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
- goto out;
+ return;
+ ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr));
+ if (ret < 0) {
+ PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+ return;
}
- fd = t->ret;
- t->ret = mark_fd_nonblocking(fd);
- if (t->ret < 0) {
- PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
+ fd = ret;
+ ret = mark_fd_nonblocking(fd);
+ if (ret < 0) {
+ PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
close(fd);
- goto out;
+ return;
}
client = para_malloc(sizeof(*client));
client->fd = fd;
client->connect_time = *now;
para_list_add(&client->node, &afs_client_list);
-out:
- t->ret = 1;
}
static void register_command_task(uint32_t cookie)
ct->task.pre_select = command_pre_select;
ct->task.post_select = command_post_select;
- ct->task.private_data = ct;
sprintf(ct->task.status, "command task");
register_task(&ct->task);
}
FOR_EACH_SLOT(i) {
struct slot_info *s = &slot[i];
- if (s->wng && !s->wng->error) {
+ if (s->wng && !s->wng->task.error) {
PARA_INFO_LOG("unregistering writer node group in slot %d\n",
i);
wng_unregister(s->wng);
- s->wng->error = error;
+ s->wng->task.error = error;
}
- if (s->fc && !s->fc->error) {
+ if (s->fc && !s->fc->task.error) {
PARA_INFO_LOG("unregistering filter chain in slot %d\n", i);
unregister_task(&s->fc->task);
- s->fc->error = error;
+ s->fc->task.error = error;
}
- if (s->receiver_node && !s->receiver_node->error) {
+ if (s->receiver_node && !s->receiver_node->task.error) {
PARA_INFO_LOG("unregistering receiver_node in slot %d\n", i);
unregister_task(&s->receiver_node->task);
- s->receiver_node->error = error;
+ s->receiver_node->task.error = error;
}
}
}
return afi[audio_format_num].num_filters;
}
-static void filter_event_handler(struct task *t)
-{
- PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
- struct filter_chain *fc = t->private_data;
- fc->error = t->ret;
- unregister_task(t);
-}
-
static void open_filters(int slot_num)
{
struct slot_info *s = &slot[slot_num];
INIT_LIST_HEAD(&s->fc->filters);
s->fc->inbuf = s->receiver_node->buf;
s->fc->in_loaded = &s->receiver_node->loaded;
- s->fc->input_error = &s->receiver_node->error;
+ s->fc->input_error = &s->receiver_node->task.error;
s->fc->task.pre_select = filter_pre_select;
- s->fc->task.event_handler = filter_event_handler;
- s->fc->task.private_data = s->fc;
- s->fc->error = 0;
+ s->fc->task.error = 0;
- s->receiver_node->output_error = &s->fc->error;
+ s->receiver_node->output_error = &s->fc->task.error;
sprintf(s->fc->task.status, "filter chain");
for (i = 0; i < nf; i++) {
struct filter_node *fn = para_calloc(sizeof(struct filter_node));
register_task(&s->fc->task);
}
-static void wng_event_handler(struct task *t)
-{
- struct writer_node_group *wng = t->private_data;
-
- PARA_INFO_LOG("%s\n", para_strerror(-t->ret));
- wng->error = t->ret;
- wng_unregister(wng);
-}
-
static void open_writers(int slot_num)
{
int ret, i;
if (s->fc) {
s->wng->buf = s->fc->outbuf;
s->wng->loaded = s->fc->out_loaded;
- s->wng->input_error = &s->fc->error;
+ s->wng->input_error = &s->fc->task.error;
s->wng->channels = &s->fc->channels;
s->wng->samplerate = &s->fc->samplerate;
- s->fc->output_error = &s->wng->error;
+ s->fc->output_error = &s->wng->task.error;
PARA_INFO_LOG("samplerate: %d\n", *s->wng->samplerate);
} else {
s->wng->buf = s->receiver_node->buf;
s->wng->loaded = &s->receiver_node->loaded;
- s->wng->input_error = &s->receiver_node->error;
+ s->wng->input_error = &s->receiver_node->task.error;
}
- s->wng->task.event_handler = wng_event_handler;
for (i = 0; i < a->num_writers; i++) {
s->wng->writer_nodes[i].conf = a->writer_conf[i];
s->wng->writer_nodes[i].writer = a->writers[i];
activate_inactive_grab_clients(slot_num, s->format, &s->fc->filters);
}
+#if 0
static void rn_event_handler(struct task *t)
{
struct receiver_node *rn = t->private_data;
tv_add(now, &restart_delay, &afi[slot[i].format].restart_barrier);
}
}
+#endif
static int open_receiver(int format)
{
}
PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n",
audio_formats[s->format], a->receiver->name, slot_num);
- rn->task.private_data = s->receiver_node;
rn->task.pre_select = a->receiver->pre_select;
rn->task.post_select = a->receiver->post_select;
- rn->task.event_handler = rn_event_handler;
sprintf(rn->task.status, "%s receiver node", rn->receiver->name);
register_task(&rn->task);
return 1;
FOR_EACH_SLOT(i) {
struct slot_info *s = &slot[i];
if (s->format == format && s->receiver_node
- && !s->receiver_node->error)
+ && s->receiver_node->task.error >= 0)
return 1;
}
return 0;
struct timeval diff;
int cafn = stat_task->current_audio_format_num;
- if (cafn < 0 || !stat_task->pcd)
+ if (cafn < 0 || !stat_task->ct)
return 0;
if (receiver_running(cafn))
return 0;
long unsigned sec, usec;
char *tmp;
-// PARA_INFO_LOG("line: %s\n", line);
+ //PARA_INFO_LOG("line: %s\n", line);
if (!line)
return 1;
itemnum = stat_line_valid(line);
switch (itemnum) {
case SI_STATUS:
stat_task->playing = strstr(line, "playing")? 1 : 0;
+ PARA_INFO_LOG("stat task playing: %d\n", stat_task->playing);
break;
case SI_OFFSET:
stat_task->offset_seconds = atoi(line + ilen + 1);
if (s->format < 0)
return;
- if (s->receiver_node && !s->receiver_node->error)
+ if (s->receiver_node && s->receiver_node->task.error >= 0)
return;
- if (s->fc && !s->fc->error)
+ if (s->fc && s->fc->task.error >= 0)
return;
- if (s->wng && !s->wng->error)
+ if (s->wng && s->wng->task.error >= 0)
return;
PARA_INFO_LOG("closing slot %d\n", slot_num);
wng_close(s->wng);
int i;
struct timeval min_delay = {0, 1};
- t->ret = 1;
if (audiod_status != AUDIOD_ON || !stat_task->playing)
return kill_all_decoders(-E_NOT_PLAYING);
if (open_current_receiver(s))
{
int i;
- t->ret = 1;
FOR_EACH_SLOT(i)
try_to_close_slot(i);
}
{
t->pre_select = audiod_pre_select;
t->post_select = audiod_post_select;
- t->event_handler = NULL;
- t->private_data = t;
+ t->error = 0;
sprintf(t->status, "audiod task");
}
exit(EXIT_FAILURE);
}
-static void signal_event_handler(struct task *t)
-{
- struct signal_task *st = t->private_data;
-
- switch (st->signum) {
- case SIGINT:
- case SIGTERM:
- case SIGHUP:
- PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
- clean_exit(EXIT_FAILURE, "caught deadly signal");
- }
-}
-
static void signal_pre_select(struct sched *s, struct task *t)
{
- struct signal_task *st = t->private_data;
- t->ret = 1;
+ struct signal_task *st = container_of(t, struct signal_task, task);
para_fd_set(st->fd, &s->rfds, &s->max_fileno);
}
static void signal_post_select(struct sched *s, struct task *t)
{
- struct signal_task *st = t->private_data;
- t->ret = 1;
+ struct signal_task *st = container_of(t, struct signal_task, task);
+ int signum;
+
if (!FD_ISSET(st->fd, &s->rfds))
return;
- t->ret = -E_SIGNAL_CAUGHT;
- st->signum = para_next_signal();
+
+ signum = para_next_signal();
+ switch (signum) {
+ case SIGINT:
+ case SIGTERM:
+ case SIGHUP:
+ PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
+ clean_exit(EXIT_FAILURE, "caught deadly signal");
+ }
}
static void signal_setup_default(struct signal_task *st)
{
st->task.pre_select = signal_pre_select;
st->task.post_select = signal_post_select;
- st->task.private_data = st;
sprintf(st->task.status, "signal task");
}
static void command_pre_select(struct sched *s, struct task *t)
{
- struct command_task *ct = t->private_data;
- t->ret = 1;
+ struct command_task *ct = container_of(t, struct command_task, task);
para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
-
}
static void command_post_select(struct sched *s, struct task *t)
{
int ret;
- struct command_task *ct = t->private_data;
+ struct command_task *ct = container_of(t, struct command_task, task);
- t->ret = 1; /* always successful */
audiod_status_dump();
if (!FD_ISSET(ct->fd, &s->rfds))
return;
{
ct->task.pre_select = command_pre_select;
ct->task.post_select = command_post_select;
- ct->task.event_handler = NULL;
- ct->task.private_data = ct;
+ ct->task.error = 0;
ct->fd = audiod_get_socket(); /* doesn't return on errors */
sprintf(ct->task.status, "command task");
}
{
int i;
- if (!stat_task->pcd)
+ if (!stat_task->ct)
return;
- client_close(stat_task->pcd);
- stat_task->pcd = NULL;
+ client_close(stat_task->ct);
+ stat_task->ct = NULL;
FOR_EACH_STATUS_ITEM(i) {
free(stat_task->stat_item_values[i]);
stat_task->stat_item_values[i] = NULL;
tv_add(now, &delay, &stat_task->restart_barrier);
}
+#if 0
static void client_task_event_handler(__a_unused struct task *t)
{
int i;
FOR_EACH_AUDIO_FORMAT(i)
afi[i].restart_barrier = stat_task->restart_barrier;
}
+#endif
static void status_pre_select(struct sched *s, struct task *t)
{
- struct status_task *st = t->private_data;
+ struct status_task *st = container_of(t, struct status_task, task);
int ret;
- t->ret = 1; /* always successful */
- if (st->pcd || audiod_status == AUDIOD_OFF)
+ if (st->ct || audiod_status == AUDIOD_OFF)
return;
if (!st->clock_diff_count && tv_diff(now, &st->restart_barrier, NULL)
< 0)
if (tv_diff(now, &st->clock_diff_barrier, NULL) < 0)
return;
PARA_INFO_LOG("clock diff count: %d\n", st->clock_diff_count);
- ret = client_open(argc, argv, &st->pcd);
+ ret = client_open(argc, argv, &st->ct);
} else {
char *argv[] = {"audiod", "stat", NULL};
int argc = 2;
- ret = client_open(argc, argv, &st->pcd);
+ ret = client_open(argc, argv, &st->ct);
}
set_stat_task_restart_barrier();
if (ret < 0)
return;
- st->pcd->task.event_handler = client_task_event_handler;
s->timeout.tv_sec = 0;
s->timeout.tv_usec = 1;
}
static void status_post_select(__a_unused struct sched *s, struct task *t)
{
- struct status_task *st = t->private_data;
+ struct status_task *st = container_of(t, struct status_task, task);
unsigned bytes_left;
- t->ret = 1;
- if (!st->pcd || st->pcd->status != CL_RECEIVING)
+ if (!st->ct || st->ct->status != CL_RECEIVING)
return;
- if (st->pcd && audiod_status == AUDIOD_OFF) {
- unregister_task(&st->pcd->task);
+ if (st->ct && audiod_status == AUDIOD_OFF) {
+ unregister_task(&st->ct->task);
close_stat_pipe();
st->clock_diff_count = conf.clock_diff_count_arg;
return;
}
- bytes_left = for_each_line(st->pcd->buf, st->pcd->loaded,
+ bytes_left = for_each_line(st->ct->buf, st->ct->loaded,
&check_stat_line, NULL);
- if (st->pcd->loaded != bytes_left) {
+ if (st->ct->loaded != bytes_left) {
st->last_status_read = *now;
- st->pcd->loaded = bytes_left;
+ st->ct->loaded = bytes_left;
} else {
struct timeval diff;
tv_diff(now, &st->last_status_read, &diff);
memset(st, 0, sizeof(struct status_task));
st->task.pre_select = status_pre_select;
st->task.post_select = status_post_select;
- st->task.private_data = st;
st->sa_time_diff_sign = 1;
st->clock_diff_count = conf.clock_diff_count_arg;
st->current_audio_format_num = -1;
init_grabbing();
setup_signal_handling();
signal_setup_default(sig_task);
- sig_task->task.event_handler = signal_event_handler;
init_status_task(stat_task);
init_command_task(cmd_task);
/** the associated task structure of audiod */
struct task task;
/** client data associated with the stat task */
- struct private_client_data *pcd;
+ struct client_task *ct;
/** the array of status items sent by para_server */
char *stat_item_values[NUM_STAT_ITEMS];
/** do not restart client command until this time */
INIT_CLIENT_ERRLISTS;
-static struct private_client_data *pcd;
+static struct client_task *ct;
static struct stdin_task sit;
static struct stdout_task sot;
-
-INIT_STDERR_LOGGING(pcd->conf.loglevel_arg);
-
-static void client_event_handler(struct task *t)
+static void supervisor_pre_select(struct sched *s, struct task *t)
{
- struct private_client_data *p = t->private_data;
-
- PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
- if (t->ret != -E_HANDSHAKE_COMPLETE) {
- unregister_task(t);
- p->error = t->ret;
+ if (ct->task.error < 0) {
+ t->error = ct->task.error;
return;
}
- if (p->status == CL_SENDING) {
+ if (ct->status == CL_SENDING) {
stdin_set_defaults(&sit);
sit.buf = para_malloc(sit.bufsize),
register_task(&sit.task);
- p->inbuf = sit.buf;
- p->in_loaded = &sit.loaded;
- p->in_error = &sit.error;
- return;
+ ct->inbuf = sit.buf;
+ ct->in_loaded = &sit.loaded;
+ ct->in_error = &sit.task.error;
+ t->error = -1;
+ goto min_delay;
}
- stdout_set_defaults(&sot);
- sot.buf = p->buf;
- sot.loaded = &p->loaded;
- sot.input_error = &p->error;
- register_task(&sot.task);
+ if (ct->status == CL_RECEIVING) {
+ stdout_set_defaults(&sot);
+ sot.buf = ct->buf;
+ sot.loaded = &ct->loaded;
+ sot.input_error = &ct->task.error;
+ register_task(&sot.task);
+ t->error = -1;
+ goto min_delay;
+ }
+ return;
+min_delay:
+ s->timeout.tv_sec = 0;
+ s->timeout.tv_usec = 1;
}
+static struct task svt = {
+ .pre_select = supervisor_pre_select
+};
+
+INIT_STDERR_LOGGING(ct->conf.loglevel_arg);
+
+
/**
- * the client program to connect to para_server
+ * The client program to connect to para_server.
*
- * \param argc usual argument count
- * \param argv usual argument vector
+ * \param argc Usual argument count.
+ * \param argv Usual argument vector.
*
* It registers two tasks: The client task that communicates with para_server
* and the standard out task that writes any output produced by the client task
s.default_timeout.tv_sec = 1;
s.default_timeout.tv_usec = 0;
- ret = client_open(argc, argv, &pcd);
+ ret = client_open(argc, argv, &ct);
if (ret < 0) /* can not use PARA_LOG here */
exit(EXIT_FAILURE);
- pcd->task.event_handler = client_event_handler;
+ register_task(&svt);
ret = schedule(&s);
if (ret < 0)
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
- client_close(pcd);
+ client_close(ct);
return ret >= 0? EXIT_SUCCESS: EXIT_FAILURE;
}
/**
* data specific to a client task
*/
-struct private_client_data {
+struct client_task {
/** the state of the connection */
int status;
/** the file descriptor */
RC4_KEY rc4_send_key;
/** the client task structure */
struct task task;
- /** non-zero if task is unregistered */
- int error;
/** the buffer used for handshake and receiving */
char buf[CLIENT_BUFSIZE];
/** number of bytes loaded in \p buf */
int *in_error;
};
-void client_close(struct private_client_data *pcd);
-int client_open(int argc, char *argv[], struct private_client_data **pcd_ptr);
+void client_close(struct client_task *ct);
+int client_open(int argc, char *argv[], struct client_task **ct);
static void rc4_send(unsigned long len, const unsigned char *indata,
unsigned char *outdata, void *private_data)
{
- struct private_client_data *pcd = private_data;
- RC4(&pcd->rc4_send_key, len, indata, outdata);
+ struct client_task *ct = private_data;
+ RC4(&ct->rc4_send_key, len, indata, outdata);
}
/*
static void rc4_recv(unsigned long len, const unsigned char *indata,
unsigned char *outdata, void *private_data)
{
- struct private_client_data *pcd = private_data;
- RC4(&pcd->rc4_recv_key, len, indata, outdata);
+ struct client_task *ct = private_data;
+ RC4(&ct->rc4_recv_key, len, indata, outdata);
}
/**
* Close the connection to para_server and free all resources.
*
- * \param pcd Pointer to the client data.
+ * \param ct Pointer to the client data.
*
* \sa client_open.
*/
-void client_close(struct private_client_data *pcd)
+void client_close(struct client_task *ct)
{
- if (!pcd)
+ if (!ct)
return;
- if (pcd->fd >= 0) {
- disable_crypt(pcd->fd);
- close(pcd->fd);
+ if (ct->fd >= 0) {
+ disable_crypt(ct->fd);
+ close(ct->fd);
}
- free(pcd->user);
- free(pcd->config_file);
- free(pcd->key_file);
- client_cmdline_parser_free(&pcd->conf);
- free(pcd);
+ free(ct->user);
+ free(ct->config_file);
+ free(ct->key_file);
+ client_cmdline_parser_free(&ct->conf);
+ free(ct);
}
/**
*/
static void client_pre_select(struct sched *s, struct task *t)
{
- struct private_client_data *pcd = t->private_data;
+ struct client_task *ct = container_of(t, struct client_task, task);
- t->ret = 1;
- pcd->check_r = 0;
- pcd->check_w = 0;
- if (pcd->fd < 0)
+ ct->check_r = 0;
+ ct->check_w = 0;
+ if (ct->fd < 0)
return;
- switch (pcd->status) {
+ switch (ct->status) {
case CL_CONNECTED:
case CL_SENT_AUTH:
case CL_SENT_CH_RESPONSE:
case CL_SENT_COMMAND:
- para_fd_set(pcd->fd, &s->rfds, &s->max_fileno);
- pcd->check_r = 1;
+ para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
+ ct->check_r = 1;
return;
case CL_RECEIVED_WELCOME:
case CL_RECEIVED_CHALLENGE:
case CL_RECEIVED_PROCEED:
- para_fd_set(pcd->fd, &s->wfds, &s->max_fileno);
- pcd->check_w = 1;
+ para_fd_set(ct->fd, &s->wfds, &s->max_fileno);
+ ct->check_w = 1;
return;
case CL_RECEIVING:
- if (pcd->loaded < CLIENT_BUFSIZE - 1) {
- para_fd_set(pcd->fd, &s->rfds, &s->max_fileno);
- pcd->check_r = 1;
+ if (ct->loaded < CLIENT_BUFSIZE - 1) {
+ para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
+ ct->check_r = 1;
}
return;
case CL_SENDING:
- if (*pcd->in_loaded) {
- PARA_INFO_LOG("loaded: %zd\n", *pcd->in_loaded);
- para_fd_set(pcd->fd, &s->wfds, &s->max_fileno);
- pcd->check_w = 1;
+ if (*ct->in_loaded) {
+ PARA_INFO_LOG("loaded: %zd\n", *ct->in_loaded);
+ para_fd_set(ct->fd, &s->wfds, &s->max_fileno);
+ ct->check_w = 1;
} else {
- if (*pcd->in_error) {
- t->ret = *pcd->in_error;
+ if (*ct->in_error) {
+ t->error = *ct->in_error;
s->timeout.tv_sec = 0;
s->timeout.tv_usec = 1;
}
}
}
-static ssize_t client_recv_buffer(struct private_client_data *pcd)
+static ssize_t client_recv_buffer(struct client_task *ct)
{
- ssize_t ret = recv_buffer(pcd->fd, pcd->buf + pcd->loaded,
- CLIENT_BUFSIZE - pcd->loaded);
+ ssize_t ret = recv_buffer(ct->fd, ct->buf + ct->loaded,
+ CLIENT_BUFSIZE - ct->loaded);
if (!ret)
return -E_SERVER_EOF;
if (ret > 0)
- pcd->loaded += ret;
+ ct->loaded += ret;
return ret;
}
*/
static void client_post_select(struct sched *s, struct task *t)
{
- struct private_client_data *pcd = t->private_data;
+ struct client_task *ct = container_of(t, struct client_task, task);
-// PARA_INFO_LOG("status %d\n", pcd->status);
- t->ret = 1;
- if (pcd->fd < 0)
+ t->error = 0;
+ if (ct->fd < 0)
return;
- if (!pcd->check_r && !pcd->check_w)
+ if (!ct->check_r && !ct->check_w)
return;
- if (pcd->check_r && !FD_ISSET(pcd->fd, &s->rfds))
+ if (ct->check_r && !FD_ISSET(ct->fd, &s->rfds))
return;
- if (pcd->check_w && !FD_ISSET(pcd->fd, &s->wfds))
+ if (ct->check_w && !FD_ISSET(ct->fd, &s->wfds))
return;
- switch (pcd->status) {
+ switch (ct->status) {
case CL_CONNECTED: /* receive welcome message */
- t->ret = client_recv_buffer(pcd);
- if (t->ret > 0)
- pcd->status = CL_RECEIVED_WELCOME;
+ t->error = client_recv_buffer(ct);
+ if (t->error > 0)
+ ct->status = CL_RECEIVED_WELCOME;
return;
case CL_RECEIVED_WELCOME: /* send auth command */
- sprintf(pcd->buf, "auth %s%s", pcd->conf.plain_given?
- "" : "rc4 ", pcd->user);
- PARA_INFO_LOG("--> %s\n", pcd->buf);
- t->ret = send_buffer(pcd->fd, pcd->buf);
- if (t->ret >= 0)
- pcd->status = CL_SENT_AUTH;
+ sprintf(ct->buf, "auth %s%s", ct->conf.plain_given?
+ "" : "rc4 ", ct->user);
+ PARA_INFO_LOG("--> %s\n", ct->buf);
+ t->error = send_buffer(ct->fd, ct->buf);
+ if (t->error >= 0)
+ ct->status = CL_SENT_AUTH;
return;
case CL_SENT_AUTH: /* receive challenge number */
- pcd->loaded = 0;
- t->ret = client_recv_buffer(pcd);
- if (t->ret < 0)
+ ct->loaded = 0;
+ t->error = client_recv_buffer(ct);
+ if (t->error < 0)
return;
- if (t->ret != 64) {
- t->ret = -E_INVALID_CHALLENGE;
- PARA_ERROR_LOG("received the following: %s\n", pcd->buf);
+ if (t->error != 64) {
+ t->error = -E_INVALID_CHALLENGE;
+ PARA_ERROR_LOG("received the following: %s\n", ct->buf);
return;
}
PARA_INFO_LOG("<-- [challenge]\n");
/* decrypt challenge number */
- t->ret = para_decrypt_challenge(pcd->key_file, &pcd->challenge_nr,
- (unsigned char *) pcd->buf, 64);
- if (t->ret > 0)
- pcd->status = CL_RECEIVED_CHALLENGE;
+ t->error = para_decrypt_challenge(ct->key_file, &ct->challenge_nr,
+ (unsigned char *) ct->buf, 64);
+ if (t->error > 0)
+ ct->status = CL_RECEIVED_CHALLENGE;
return;
case CL_RECEIVED_CHALLENGE: /* send decrypted challenge */
- PARA_INFO_LOG("--> %lu\n", pcd->challenge_nr);
- t->ret = send_va_buffer(pcd->fd, "%s%lu", CHALLENGE_RESPONSE_MSG,
- pcd->challenge_nr);
- if (t->ret > 0)
- pcd->status = CL_SENT_CH_RESPONSE;
+ PARA_INFO_LOG("--> %lu\n", ct->challenge_nr);
+ t->error = send_va_buffer(ct->fd, "%s%lu", CHALLENGE_RESPONSE_MSG,
+ ct->challenge_nr);
+ if (t->error > 0)
+ ct->status = CL_SENT_CH_RESPONSE;
return;
case CL_SENT_CH_RESPONSE: /* read server response */
{
size_t bytes_received;
unsigned char rc4_buf[2 * RC4_KEY_LEN] = "";
- pcd->loaded = 0;
- t->ret = client_recv_buffer(pcd);
- if (t->ret < 0)
+ ct->loaded = 0;
+ t->error = client_recv_buffer(ct);
+ if (t->error < 0)
return;
- bytes_received = t->ret;
+ bytes_received = t->error;
PARA_DEBUG_LOG("++++ server info ++++\n%s\n++++ end of server "
- "info ++++\n", pcd->buf);
+ "info ++++\n", ct->buf);
/* check if server has sent "Proceed" message */
- t->ret = -E_CLIENT_AUTH;
- if (!strstr(pcd->buf, PROCEED_MSG))
+ t->error = -E_CLIENT_AUTH;
+ if (!strstr(ct->buf, PROCEED_MSG))
return;
- t->ret = 1;
- pcd->status = CL_RECEIVED_PROCEED;
+ t->error = 0;
+ ct->status = CL_RECEIVED_PROCEED;
if (bytes_received < PROCEED_MSG_LEN + 32)
return;
PARA_INFO_LOG("decrypting session key\n");
- t->ret = para_decrypt_buffer(pcd->key_file, rc4_buf,
- (unsigned char *)pcd->buf + PROCEED_MSG_LEN + 1,
+ t->error = para_decrypt_buffer(ct->key_file, rc4_buf,
+ (unsigned char *)ct->buf + PROCEED_MSG_LEN + 1,
bytes_received - PROCEED_MSG_LEN - 1);
- if (t->ret < 0)
+ if (t->error < 0)
return;
- RC4_set_key(&pcd->rc4_send_key, RC4_KEY_LEN, rc4_buf);
- RC4_set_key(&pcd->rc4_recv_key, RC4_KEY_LEN, rc4_buf + RC4_KEY_LEN);
- enable_crypt(pcd->fd, rc4_recv, rc4_send, pcd);
+ RC4_set_key(&ct->rc4_send_key, RC4_KEY_LEN, rc4_buf);
+ RC4_set_key(&ct->rc4_recv_key, RC4_KEY_LEN, rc4_buf + RC4_KEY_LEN);
+ enable_crypt(ct->fd, rc4_recv, rc4_send, ct);
}
case CL_RECEIVED_PROCEED: /* concat args and send command */
{
int i;
char *command = NULL;
- for (i = 0; i < pcd->conf.inputs_num; i++) {
+ for (i = 0; i < ct->conf.inputs_num; i++) {
char *tmp = command;
command = make_message("%s\n%s", command?
- command : "", pcd->conf.inputs[i]);
+ command : "", ct->conf.inputs[i]);
free(tmp);
}
command = para_strcat(command, EOC_MSG "\n");
PARA_DEBUG_LOG("--> %s\n", command);
- t->ret = send_buffer(pcd->fd, command);
+ t->error = send_buffer(ct->fd, command);
free(command);
- if (t->ret > 0)
- pcd->status = CL_SENT_COMMAND;
+ if (t->error > 0)
+ ct->status = CL_SENT_COMMAND;
return;
}
case CL_SENT_COMMAND:
- pcd->loaded = 0;
- t->ret = client_recv_buffer(pcd);
- if (t->ret < 0)
+ ct->loaded = 0;
+ t->error = client_recv_buffer(ct);
+ if (t->error < 0)
return;
- t->ret = -E_HANDSHAKE_COMPLETE;
- if (strstr(pcd->buf, AWAITING_DATA_MSG))
- pcd->status = CL_SENDING;
+ if (strstr(ct->buf, AWAITING_DATA_MSG))
+ ct->status = CL_SENDING;
else
- pcd->status = CL_RECEIVING;
+ ct->status = CL_RECEIVING;
return;
case CL_SENDING: /* FIXME: might block */
- PARA_INFO_LOG("loaded: %zd\n", *pcd->in_loaded);
- t->ret = send_bin_buffer(pcd->fd, pcd->inbuf, *pcd->in_loaded);
- if (t->ret < 0)
+ PARA_INFO_LOG("loaded: %zd\n", *ct->in_loaded);
+ t->error = send_bin_buffer(ct->fd, ct->inbuf, *ct->in_loaded);
+ if (t->error < 0)
return;
- *pcd->in_loaded = 0;
+ *ct->in_loaded = 0;
return;
case CL_RECEIVING:
- t->ret = client_recv_buffer(pcd);
+ t->error = client_recv_buffer(ct);
return;
}
}
/* connect to para_server and register the client task */
-static int client_connect(struct private_client_data *pcd)
+static int client_connect(struct client_task *ct)
{
int ret;
- pcd->fd = -1;
- ret = makesock(AF_UNSPEC, IPPROTO_TCP, 0, pcd->conf.hostname_arg,
- pcd->conf.server_port_arg);
+ ct->fd = -1;
+ ret = makesock(AF_UNSPEC, IPPROTO_TCP, 0, ct->conf.hostname_arg,
+ ct->conf.server_port_arg);
if (ret < 0)
return ret;
- pcd->fd = ret;
- pcd->status = CL_CONNECTED;
- ret = mark_fd_nonblocking(pcd->fd);
+ ct->fd = ret;
+ ct->status = CL_CONNECTED;
+ ret = mark_fd_nonblocking(ct->fd);
if (ret < 0)
goto err_out;
- pcd->task.pre_select = client_pre_select;
- pcd->task.post_select = client_post_select;
- pcd->task.private_data = pcd;
- sprintf(pcd->task.status, "client");
- register_task(&pcd->task);
+ ct->task.pre_select = client_pre_select;
+ ct->task.post_select = client_post_select;
+ sprintf(ct->task.status, "client");
+ register_task(&ct->task);
return 1;
err_out:
- close(pcd->fd);
- pcd->fd = -1;
+ close(ct->fd);
+ ct->fd = -1;
return ret;
}
*
* \return Standard.
*/
-int client_open(int argc, char *argv[], struct private_client_data **pcd_ptr)
+int client_open(int argc, char *argv[], struct client_task **ct_ptr)
{
char *home = para_homedir();
struct stat statbuf;
int ret;
- struct private_client_data *pcd =
- para_calloc(sizeof(struct private_client_data));
+ struct client_task *ct = para_calloc(sizeof(struct client_task));
- *pcd_ptr = pcd;
- pcd->fd = -1;
- ret = client_cmdline_parser(argc, argv, &pcd->conf);
- HANDLE_VERSION_FLAG("client", pcd->conf);
+ *ct_ptr = ct;
+ ct->fd = -1;
+ ret = client_cmdline_parser(argc, argv, &ct->conf);
+ HANDLE_VERSION_FLAG("client", ct->conf);
ret = -E_CLIENT_SYNTAX;
- if (!pcd->conf.inputs_num)
+ if (!ct->conf.inputs_num)
goto out;
- pcd->user = pcd->conf.user_given?
- para_strdup(pcd->conf.user_arg) : para_logname();
+ ct->user = ct->conf.user_given?
+ para_strdup(ct->conf.user_arg) : para_logname();
- pcd->key_file = pcd->conf.key_file_given?
- para_strdup(pcd->conf.key_file_arg) :
- make_message("%s/.paraslash/key.%s", home, pcd->user);
+ ct->key_file = ct->conf.key_file_given?
+ para_strdup(ct->conf.key_file_arg) :
+ make_message("%s/.paraslash/key.%s", home, ct->user);
- pcd->config_file = pcd->conf.config_file_given?
- para_strdup(pcd->conf.config_file_arg) :
+ ct->config_file = ct->conf.config_file_given?
+ para_strdup(ct->conf.config_file_arg) :
make_message("%s/.paraslash/client.conf", home);
- ret = stat(pcd->config_file, &statbuf);
- if (ret && pcd->conf.config_file_given) {
+ ret = stat(ct->config_file, &statbuf);
+ if (ret && ct->conf.config_file_given) {
ret = -E_NO_CONFIG;
goto out;
}
.check_required = 0,
.check_ambiguity = 0
};
- client_cmdline_parser_config_file(pcd->config_file,
- &pcd->conf, ¶ms);
+ client_cmdline_parser_config_file(ct->config_file,
+ &ct->conf, ¶ms);
}
ret = 1;
- PARA_INFO_LOG("loglevel: %d\n", pcd->conf.loglevel_arg);
- PARA_INFO_LOG("config_file: %s\n", pcd->config_file);
- PARA_INFO_LOG("key_file: %s\n", pcd->key_file);
- PARA_NOTICE_LOG("connecting %s:%d\n", pcd->conf.hostname_arg,
- pcd->conf.server_port_arg);
- ret = client_connect(pcd);
+ PARA_INFO_LOG("loglevel: %d\n", ct->conf.loglevel_arg);
+ PARA_INFO_LOG("config_file: %s\n", ct->config_file);
+ PARA_INFO_LOG("key_file: %s\n", ct->key_file);
+ PARA_NOTICE_LOG("connecting %s:%d\n", ct->conf.hostname_arg,
+ ct->conf.server_port_arg);
+ ret = client_connect(ct);
out:
free(home);
if (ret < 0) {
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
- client_close(pcd);
- *pcd_ptr = NULL;
+ client_close(ct);
+ *ct_ptr = NULL;
}
return ret;
}
static void dccp_recv_pre_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = t->private_data;
+ struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_dccp_recv_data *pdd = rn->private_data;
- t->ret = 1;
+ t->error = 0;
para_fd_set(pdd->fd, &s->rfds, &s->max_fileno);
}
static void dccp_recv_post_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = t->private_data;
+ struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_dccp_recv_data *pdd = rn->private_data;
- if (rn->output_error && *rn->output_error) {
- t->ret = *rn->output_error;
- goto out;
+ if (rn->output_error && *rn->output_error < 0) {
+ t->error = *rn->output_error;
+ return;
}
- t->ret = 1;
- if (!s->select_ret || !FD_ISSET(pdd->fd, &s->rfds))
- goto out; /* nothing to do */
- t->ret = -E_DCCP_OVERRUN;
- if (rn->loaded >= DCCP_BUFSIZE)
- goto out;
- t->ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
+ if (!FD_ISSET(pdd->fd, &s->rfds))
+ return; /* nothing to do */
+ if (rn->loaded >= DCCP_BUFSIZE) {
+ t->error = -E_DCCP_OVERRUN;
+ return;
+ }
+ t->error = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
DCCP_BUFSIZE - rn->loaded);
- if (t->ret <= 0) {
- if (!t->ret)
- t->ret = -E_RECV_EOF;
- goto out;
+ if (t->error > 0) {
+ rn->loaded += t->error;
+ return;
}
- rn->loaded += t->ret;
- return;
-out:
- if (t->ret < 0)
- rn->error = t->ret;
+ if (!t->error)
+ t->error = -E_RECV_EOF;
}
/**
- * the init function of the dccp receiver
+ * The init function of the dccp receiver.
*
- * \param r pointer to the receiver struct to initialize
+ * \param r Pointer to the receiver struct to initialize.
*
- * Initialize all function pointers of \a r
+ * Initialize all function pointers of \a r.
*/
void dccp_recv_init(struct receiver *r)
{
#define RBTREE_ERRORS
#define RECV_ERRORS
#define SEND_COMMON_ERRORS
+#define STDOUT_ERRORS
+
extern const char **para_errlist[];
PARA_ERROR(NO_CONFIG, "config file not found"), \
PARA_ERROR(CLIENT_AUTH, "authentication failed"), \
PARA_ERROR(SERVER_EOF, "connection closed by para_server"), \
- PARA_ERROR(HANDSHAKE_COMPLETE, ""), /* not really an error */ \
#define SCHED_ERRORS \
#define STDIN_ERRORS \
- PARA_ERROR(STDIN_READ, "failed to read from stdin"), \
PARA_ERROR(STDIN_EOF, "end of file"), \
-#define STDOUT_ERRORS \
- PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \
-
#define NET_ERRORS \
PARA_ERROR(NAME_TOO_LONG, "name too long for struct sockaddr_un"), \
PARA_ERROR(NO_MORE_SLOTS, "no more empty slots"), \
PARA_ERROR(MISSING_COLON, "syntax error: missing colon"), \
PARA_ERROR(UNSUPPORTED_AUDIO_FORMAT, "given audio format not supported"), \
- PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \
PARA_ERROR(NOT_PLAYING, "not playing"), \
#define WRITE_ERRORS \
PARA_ERROR(WRITE_SYNTAX, "para_write syntax error"), \
- PARA_ERROR(PREMATURE_END, "premature end of audio file"), \
PARA_ERROR(NO_WAV_HEADER, "wave header not found"), \
PARA_ERROR(WAV_HEADER_SUCCESS, "successfully read wave header"), \
PARA_ERROR(NO_DELAY, "no initial delay"), \
INIT_STDERR_LOGGING(conf.loglevel_arg);
-static void filter_event_handler(struct task *t)
-{
- PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
- unregister_task(t);
-}
-
static void open_filters(void)
{
struct filter_node *fn;
fc->inbuf = sit->buf;
fc->in_loaded = &sit->loaded;
- fc->input_error = &sit->error;
- fc->error = 0;
- fc->output_error = &sot->error;
- fc->task.private_data = fc;
+ fc->input_error = &sit->task.error;
+ fc->task.error = 0;
+ fc->output_error = &sot->task.error;
fc->task.pre_select = filter_pre_select;
- fc->task.event_handler = filter_event_handler;
sprintf(fc->task.status, "filter chain");
for (i = 0; i < conf.filter_given; i++) {
stdout_set_defaults(sot);
sot->buf = fc->outbuf;
sot->loaded = fc->out_loaded;
- sot->input_error = &fc->error;
+ sot->input_error = &fc->task.error;
register_task(&sit->task);
register_task(&fc->task);
size_t *in_loaded;
/** Contains the number of bytes loaded in the output buffer. */
size_t *out_loaded;
- /** Non-zero if this filter wont' produce any more output. */
- int error;
/** Pointer to the error variable of the receiving application. */
int *input_error;
/** Pointer to the error variable of the writing application. */
*/
void filter_pre_select(__a_unused struct sched *s, struct task *t)
{
- struct filter_chain *fc = t->private_data;
+ struct filter_chain *fc = container_of(t, struct filter_chain, task);
struct filter_node *fn;
char *ib;
size_t *loaded;
int conv, conv_total = 0;
- if (fc->output_error && *fc->output_error) {
- t->ret = *fc->output_error;
- goto err_out;
+ if (fc->output_error && *fc->output_error < 0) {
+ t->error = *fc->output_error;
+ return;
}
again:
ib = fc->inbuf;
size_t size, old_fn_loaded = fn->loaded;
// PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n",
// fc, *loaded, fn->filter->name);
- t->ret = fn->filter->convert(ib, *loaded, fn);
- if (t->ret < 0)
- goto err_out;
- size = t->ret;
+ t->error = fn->filter->convert(ib, *loaded, fn);
+ if (t->error < 0)
+ return;
+ size = t->error;
call_callbacks(fn, ib, size, fn->buf + old_fn_loaded,
fn->loaded - old_fn_loaded);
*loaded -= size;
// fc->eof, *fc->out_loaded, conv, conv_total);
if (conv)
goto again;
- t->ret = 1;
if (!*fc->input_error)
return;
if (*fc->out_loaded)
return;
if (*fc->in_loaded && conv_total)
return;
- t->ret = -E_FC_EOF;
-err_out:
- fc->error = t->ret;
+ t->error = -E_FC_EOF;
}
/**
static void http_recv_pre_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = t->private_data;
+ struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_http_recv_data *phd = rn->private_data;
- t->ret = 1;
+ t->error = 0;
if (phd->status == HTTP_CONNECTED)
para_fd_set(phd->fd, &s->wfds, &s->max_fileno);
else
static void http_recv_post_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = t->private_data;
+ struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_http_recv_data *phd = rn->private_data;
- if (rn->output_error && *rn->output_error) {
- t->ret = *rn->output_error;
- goto out;
+ if (rn->output_error && *rn->output_error < 0) {
+ t->error = *rn->output_error;
+ return;
}
- t->ret = 1;
- if (!s->select_ret)
- goto out;
if (phd->status == HTTP_CONNECTED) {
char *rq;
if (!FD_ISSET(phd->fd, &s->wfds))
- goto out;
+ return;
rq = make_request_msg();
PARA_INFO_LOG("sending http request\n");
- t->ret = send_va_buffer(phd->fd, "%s", rq);
+ t->error = send_va_buffer(phd->fd, "%s", rq);
free(rq);
- if (t->ret > 0)
+ if (t->error >= 0)
phd->status = HTTP_SENT_GET_REQUEST;
- goto out;
+ return;
}
if (!FD_ISSET(phd->fd, &s->rfds))
- goto out;
+ return;
if (phd->status == HTTP_SENT_GET_REQUEST) {
- t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
- if (t->ret < 0)
- goto out;
- PARA_INFO_LOG("received ok msg, streaming\n");
- t->ret = 1;
- phd->status = HTTP_STREAMING;
- goto out;
+ t->error = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
+ if (t->error >= 0) {
+ PARA_INFO_LOG("received ok msg, streaming\n");
+ phd->status = HTTP_STREAMING;
+ }
+ return;
}
- t->ret = -E_HTTP_RECV_OVERRUN;
- if (rn->loaded >= BUFSIZE)
- goto out;
- t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
+ if (rn->loaded >= BUFSIZE) {
+ t->error = -E_HTTP_RECV_OVERRUN;
+ return;
+ }
+ t->error = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
BUFSIZE - rn->loaded);
- if (t->ret <= 0) {
- if (!t->ret)
- t->ret = -E_RECV_EOF;
- goto out;
+ if (t->error > 0) {
+ rn->loaded += t->error;
+ return;
}
- rn->loaded += t->ret;
-out:
- if (t->ret < 0)
- rn->error = t->ret;
+ if (!t->error)
+ t->error = -E_RECV_EOF;
}
static void http_recv_close(struct receiver_node *rn)
static void ortp_recv_pre_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = t->private_data;
+ struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_ortp_recv_data *pord = rn->private_data;
struct timeval tmp;
}
if (tv_diff(&s->timeout, &tmp, NULL) > 0)
s->timeout = tmp;
- t->ret = 1;
}
static void compute_next_chunk(unsigned chunk_time,
static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
{
- struct receiver_node *rn = t->private_data;
+ struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_ortp_recv_data *pord = rn->private_data;
mblk_t *mp;
int packet_type, stream_type;
size_t packet_size;
// PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
- if (rn->output_error && *rn->output_error) {
- rn->error = *rn->output_error;
- t->ret = rn->error;
+ if (rn->output_error && *rn->output_error < 0) {
+ t->error = *rn->output_error;
return;
}
- t->ret = 1;
if (pord->start.tv_sec)
if (tv_diff(now, &pord->next_chunk, NULL) < 0)
return;
// PARA_INFO_LOG("nope, chunk_ts = %d, loaded: %d, bad: %d\n",
// pord->timestamp, rn->loaded, pord->c_bad);
pord->c_bad++;
- t->ret = -E_TOO_MANY_BAD_CHUNKS;
- if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000)
+ if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000) {
+ t->error = -E_TOO_MANY_BAD_CHUNKS;
return;
- t->ret = 1;
+ }
tv_add(now, &min_delay, &pord->next_chunk);
return;
}
/* okay, we have a chunk of data */
if (!pord->start.tv_sec)
pord->start = *now;
- t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
- if (t->ret < ORTP_AUDIO_HEADER_LEN) {
- if (t->ret < 0)
- t->ret = -E_MSG_TO_BUF;
+ t->error = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
+ if (t->error < ORTP_AUDIO_HEADER_LEN) {
+ if (t->error < 0)
+ t->error = -E_MSG_TO_BUF;
else
- t->ret = -E_ORTP_RECV_EOF;
- rn->error = t->ret;
+ t->error = -E_ORTP_RECV_EOF;
goto err_out;
}
- packet_size = t->ret;
+ packet_size = t->error;
packet_type = READ_PACKET_TYPE(tmpbuf);
stream_type = READ_STREAM_TYPE(tmpbuf);
chunk_time = READ_CHUNK_TIME(tmpbuf);
switch (packet_type) {
unsigned header_len, payload_len;
case ORTP_EOF:
- t->ret = -E_RECV_EOF;
- rn->error = t->ret;
+ t->error = -E_RECV_EOF;
goto err_out;
case ORTP_BOF:
PARA_INFO_LOG("bof (%zu)\n", packet_size);
if (!pord->have_header && stream_type)
/* can't use the data, wait for header */
goto success;
- t->ret = -E_OVERRUN;
- if (packet_size + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN)
+ if (packet_size + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
+ t->error = -E_OVERRUN;
goto err_out;
+ }
if (packet_size > ORTP_AUDIO_HEADER_LEN) {
memcpy(rn->buf + rn->loaded, tmpbuf + ORTP_AUDIO_HEADER_LEN,
packet_size - ORTP_AUDIO_HEADER_LEN);
rn->loaded = packet_size - ORTP_AUDIO_HEADER_LEN;
goto success;
}
- t->ret = -E_INVALID_HEADER;
- if (header_len + ORTP_AUDIO_HEADER_LEN > packet_size)
+ if (header_len + ORTP_AUDIO_HEADER_LEN > packet_size) {
+ t->error = -E_INVALID_HEADER;
goto err_out;
+ }
payload_len = packet_size - ORTP_AUDIO_HEADER_LEN - header_len;
- t->ret = -E_OVERRUN;
- if (rn->loaded + payload_len > CHUNK_SIZE)
+ if (rn->loaded + payload_len > CHUNK_SIZE) {
+ t->error = -E_OVERRUN;
goto err_out;
+ }
if (payload_len)
memcpy(rn->buf + rn->loaded, tmpbuf
+ (packet_size - payload_len), payload_len);
rn->loaded += payload_len;
- goto success;
}
success:
- t->ret = 1;
+ t->error = 0;
freemsg(mp);
if (pord->c_bad) {
pord->c_bad = 0;
compute_next_chunk(chunk_time, pord);
return;
err_out:
- rn->error = t->ret;
freemsg(mp);
}
return check_receiver_arg(conf.receiver_arg, receiver_num);
}
-static void rn_event_handler(struct task *t)
-{
- struct receiver_node *rn = t->private_data;
- PARA_NOTICE_LOG("%s\n", para_strerror(-t->ret));
- rn->error = t->ret;
- unregister_task(t);
-}
-
/**
* the main function of para_recv
*
stdout_set_defaults(&sot);
sot.buf = rn.buf;
sot.loaded = &rn.loaded;
- sot.input_error = &rn.error;
+ sot.input_error = &rn.task.error;
register_task(&sot.task);
- rn.task.private_data = &rn;
rn.task.pre_select = r->pre_select;
rn.task.post_select = r->post_select;
- rn.task.event_handler = rn_event_handler;
sprintf(rn.task.status, "receiver node");
register_task(&rn.task);
size_t loaded;
/** receiver-specific data */
void *private_data;
- /** Set to non-zero error value on errors or on end of file. */
- int error;
/** Pointer to the error member of the consumer. */
int *output_error;
/** pointer to the configuration data for this instance */
static struct timeval now_struct;
struct timeval *now = &now_struct;
+/**
+ * Remove a task from the scheduler.
+ *
+ * \param t the task to remove
+ *
+ * If the pre_select pointer of \a t is not \p NULL, it is removed from
+ * the pre_select list of the scheduler. Same goes for \a post_select.
+ */
+void unregister_task(struct task *t)
+{
+ if (!initialized)
+ return;
+ PARA_INFO_LOG("unregistering %s (%p)\n", t->status, t);
+ if (t->pre_select)
+ list_del(&t->pre_select_node);
+ if (t->post_select)
+ list_del(&t->post_select_node);
+};
+
+
static void sched_preselect(struct sched *s)
{
struct task *t, *tmp;
-again:
list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) {
t->pre_select(s, t);
// PARA_INFO_LOG("%s \n", t->status);
- if (t->ret > 0)
+ if (t->error >= 0)
continue;
- if (!t->event_handler)
- continue;
- t->event_handler(t);
- goto again;
+ unregister_task(t);
}
}
list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
t->post_select(s, t);
// PARA_INFO_LOG("%s: %d\n", t->status, t->ret);
- if (t->ret > 0 || !t->event_handler)
+ if (t->error >= 0)
continue;
- t->event_handler(t);
+ unregister_task(t);
}
}
*/
int schedule(struct sched *s)
{
+ int ret;
+
if (!initialized)
return -E_NOT_INITIALIZED;
gettimeofday(now, NULL);
s->timeout = s->default_timeout;
s->max_fileno = -1;
sched_preselect(s);
- s->select_ret = para_select(s->max_fileno + 1, &s->rfds,
- &s->wfds, &s->timeout);
- if (s->select_ret < 0)
- return s->select_ret;
+ ret = para_select(s->max_fileno + 1, &s->rfds, &s->wfds, &s->timeout);
+ if (ret < 0)
+ return ret;
gettimeofday(now, NULL);
sched_post_select(s);
if (list_empty(&pre_select_list) && list_empty(&post_select_list))
PARA_INFO_LOG("registering %s (%p)\n", t->status, t);
if (t->pre_select) {
PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select);
- para_list_add(&t->pre_select_node, &pre_select_list);
+ list_add_tail(&t->pre_select_node, &pre_select_list);
}
if (t->post_select) {
PARA_DEBUG_LOG("post_select: %p\n", &t->pre_select);
- para_list_add(&t->post_select_node, &post_select_list);
+ list_add_tail(&t->post_select_node, &post_select_list);
}
}
-/**
- * remove a task from the scheduler
- *
- * \param t the task to remove
- *
- * If the pre_select pointer of \a t is not \p NULL, it is removed from
- * the pre_select list of the scheduler. Same goes for \a post_select.
- */
-void unregister_task(struct task *t)
-{
- if (!initialized)
- return;
- PARA_INFO_LOG("unregistering %s (%p)\n", t->status, t);
- if (t->pre_select)
- list_del(&t->pre_select_node);
- if (t->post_select)
- list_del(&t->post_select_node);
-};
-
/**
* unregister all tasks
*
*
* \param id the task identifier
*
- * Find the task identified by \a id, set the tasks' return value to
- * \p -E_TASK_KILLED and call the event handler of the task.
+ * Find the task identified by \a id, set the tasks' error value to
+ * \p -E_TASK_KILLED and unregister the task.
*
* \return Positive on success, negative on errors (e.g. if \a id does not
* correspond to a registered task).
sprintf(buf, "%p", t);
if (strcmp(id, buf))
continue;
- t->ret = -E_TASK_KILLED;
- if (t->event_handler)
- t->event_handler(t);
+ t->error = -E_TASK_KILLED;
+ unregister_task(t);
return 1;
}
list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
sprintf(buf, "%p", t);
if (strcmp(id, buf))
continue;
- t->ret = -E_TASK_KILLED;
- if (t->event_handler)
- t->event_handler(t);
+ t->error = -E_TASK_KILLED;
+ unregister_task(t);
return 1;
}
return -E_NO_SUCH_TASK;
fd_set wfds;
/** highest numbered file descriptor in any of the above fd sets */
int max_fileno;
- /** the return value of the previous select call */
- int select_ret;
};
/**
* \sa struct sched
*/
struct task {
- /** pointer to the struct this task is embedded in */
- void *private_data;
/**
* the pre select hook of \a t
*
* evaluate and act upon the results of the previous select call.
*/
void (*post_select)(struct sched *s, struct task *t);
- /** gets called if pre_select or post_select returned an error */
- void (*event_handler)(struct task *t);
- /** pre_select() and post_select store their return value here */
- int ret;
+ /** Whether this task is in error state. */
+ int error;
/** position of the task in the pre_select list of the scheduler */
struct list_head pre_select_node;
/** position of the task in the post_select list of the scheduler */
*/
static void stdin_pre_select(struct sched *s, struct task *t)
{
- struct stdin_task *sit = t->private_data;
- t->ret = 1;
+ struct stdin_task *sit = container_of(t, struct stdin_task, task);
+ t->error = 0;
sit->check_fd = 0;
if (sit->loaded >= sit->bufsize)
return;
para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
}
-static void stdin_default_event_handler(struct task *t)
-{
- PARA_NOTICE_LOG("%p: %s\n", t, para_strerror(-t->ret));
- unregister_task(t);
-}
-
/**
* The post select function of the stdin task.
*
*/
static void stdin_post_select(struct sched *s, struct task *t)
{
- struct stdin_task *sit = t->private_data;
+ struct stdin_task *sit = container_of(t, struct stdin_task, task);
ssize_t ret;
- t->ret = 1;
+ t->error = 0;
if (!sit->check_fd)
return;
if (!FD_ISSET(STDIN_FILENO, &s->rfds))
return;
ret = read(STDIN_FILENO, sit->buf + sit->loaded, sit->bufsize - sit->loaded);
if (ret < 0)
- t->ret = -E_STDIN_READ;
- else if (ret > 0) {
+ t->error = ERRNO_TO_PARA_ERROR(errno);
+ else if (ret > 0)
sit->loaded += ret;
- t->ret = ret;
- } else
- t->ret = -E_STDIN_EOF;
- if (t->ret < 0)
- sit->error = t->ret;
+ else
+ t->error = -E_STDIN_EOF;
}
/**
sit->bufsize = 16 * 1024,
sit->loaded = 0,
- sit->error = 0,
sit->task.pre_select = stdin_pre_select;
sit->task.post_select = stdin_post_select;
- sit->task.event_handler = stdin_default_event_handler;
- sit->task.private_data = sit;
sprintf(sit->task.status, "stdin reader");
ret = mark_fd_nonblocking(STDIN_FILENO);
if (ret >= 0)
int check_fd;
/** The task structure. */
struct task task;
- /** Non-zero on read error, or if a read from stdin returned zero. */
- int error;
};
void stdin_set_defaults(struct stdin_task *sit);
*/
static void stdout_pre_select(struct sched *s, struct task *t)
{
- struct stdout_task *sot = t->private_data;
+ struct stdout_task *sot = container_of(t, struct stdout_task, task);
- t->ret = 1;
+ t->error = 0;
sot->check_fd = 0;
if (!*sot->loaded) {
- if (*sot->input_error) {
- t->ret = *sot->input_error;
+ if (*sot->input_error < 0) {
+ t->error = *sot->input_error;
s->timeout.tv_sec = 0;
s->timeout.tv_usec = 1;
}
*/
static void stdout_post_select(struct sched *s, struct task *t)
{
- struct stdout_task *sot = t->private_data;
+ struct stdout_task *sot = container_of(t, struct stdout_task, task);
ssize_t ret;
- t->ret = 1;
+ t->error = 0;
if (!sot->check_fd) {
if (*sot->input_error)
- t->ret = *sot->input_error;
+ t->error = *sot->input_error;
return;
}
if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
return;
- t->ret = -E_STDOUT_WRITE;
ret = write(STDOUT_FILENO, sot->buf, *sot->loaded);
- if (ret <= 0)
+ if (ret < 0) {
+ t->error = -ERRNO_TO_PARA_ERROR(errno);
return;
+ }
*sot->loaded -= ret;
if (*sot->loaded)
memmove(sot->buf, sot->buf + ret, *sot->loaded);
- t->ret = 1;
-}
-
-static void stdout_default_event_handler(struct task *t)
-{
- PARA_NOTICE_LOG("%p: %s\n", t, para_strerror(-t->ret));
- unregister_task(t);
}
/**
{
int ret;
- sot->task.private_data = sot;
sot->task.pre_select = stdout_pre_select;
sot->task.post_select = stdout_post_select;
- sot->task.event_handler = stdout_default_event_handler;
- sot->error = 0;
sprintf(sot->task.status, "stdout writer");
ret = mark_fd_nonblocking(STDOUT_FILENO);
if (ret >= 0)
size_t *loaded;
/** Pointer to the error variable of the feeding task. */
int *input_error;
- /** Non-zero if a write error occurred. */
- int error;
/** The task structure. */
struct task task;
/** Whether \p STDOUT_FILENO was included in the write fd set. */
/** Number of bytes loaded in \a buf. */
size_t *loaded;
/** Non-zero if an error occurred or end of file was reached. */
- int *error;
+ int *input_error;
/** Number of channels specified in wav header given by \a buf. */
unsigned channels;
/** Sample rate specified in wav header given by \a buf. */
unsigned samplerate;
- /** The task structure for this task. */
struct task task;
};
};
static struct write_args_info conf;
+
static struct stdin_task sit;
-static struct check_wav_task cwt;
-static struct initial_delay_task idt;
+
+static struct check_wav_task the_check_wav_task;
+static struct initial_delay_task the_initial_delay_task;
+
static struct writer_node_group *wng;
/** Length of a standard wav header. */
*/
static void check_wav_pre_select(__a_unused struct sched *s, struct task *t)
{
- struct check_wav_task *wt = t->private_data;
+ struct check_wav_task *cwt = container_of(t, struct check_wav_task, task);
unsigned char *a;
+ int ret;
- if (*wt->loaded < WAV_HEADER_LEN) {
- t->ret = *wt->error? -E_PREMATURE_END : 1;
+ if (*cwt->loaded < WAV_HEADER_LEN) {
+ if (*cwt->input_error < 0)
+ t->error = *cwt->input_error;
return;
}
- wt->channels = 2;
- wt->samplerate = 44100;
- a = (unsigned char*)wt->buf;
- t->ret = -E_NO_WAV_HEADER;
- if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
- return;
- wt->channels = (unsigned) a[22];
- wt->samplerate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
- *wt->loaded -= WAV_HEADER_LEN;
- memmove(wt->buf, wt->buf + WAV_HEADER_LEN, *wt->loaded);
- t->ret = -E_WAV_HEADER_SUCCESS;
- PARA_INFO_LOG("channels: %d, sample rate: %d\n", wt->channels, wt->samplerate);
+ cwt->channels = 2;
+ cwt->samplerate = 44100;
+ a = (unsigned char*)cwt->buf;
+ if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F') {
+ PARA_NOTICE_LOG("wav header not found\n");
+ t->error = -E_NO_WAV_HEADER;
+ goto out;
+ }
+ cwt->channels = (unsigned) a[22];
+ cwt->samplerate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
+ *cwt->loaded -= WAV_HEADER_LEN;
+ memmove(cwt->buf, cwt->buf + WAV_HEADER_LEN, *cwt->loaded);
+ t->error = -E_WAV_HEADER_SUCCESS;
+ PARA_INFO_LOG("channels: %d, sample rate: %d\n", cwt->channels, cwt->samplerate);
+out:
+ wng->channels = &cwt->channels;
+ wng->samplerate = &cwt->samplerate;
+ ret = wng_open(wng);
+ if (ret < 0)
+ t->error = ret;
+ s->timeout.tv_sec = 0;
+ s->timeout.tv_usec = 1;
}
static void initial_delay_pre_select(struct sched *s, struct task *t)
{
- struct initial_delay_task *dt = t->private_data;
+ struct initial_delay_task *idt = container_of(t, struct initial_delay_task, task);
struct timeval diff;
- t->ret = -E_NO_DELAY;
- if (!dt->start_time.tv_sec && !dt->start_time.tv_usec)
+ if (!idt->start_time.tv_sec && !idt->start_time.tv_usec) {
+ t->error = -E_NO_DELAY;
return;
- t->ret = -E_DELAY_TIMEOUT;
- if (tv_diff(now, &dt->start_time, &diff) > 0)
+ }
+ if (tv_diff(now, &idt->start_time, &diff) > 0) {
+ t->error = -E_DELAY_TIMEOUT;
return;
- t->ret = 1;
+ }
if (tv_diff(&s->timeout , &diff, NULL) > 0)
s->timeout = diff;
}
{
int i, ret = -E_WRITE_SYNTAX;
struct writer_node_group *g = NULL;
+ struct initial_delay_task *idt = &the_initial_delay_task;
if (conf.list_writers_given) {
char *msg = NULL;
if (sscanf(conf.start_time_arg, "%lu:%lu",
&sec, &usec) != 2)
goto out;
- idt.start_time.tv_sec = sec;
- idt.start_time.tv_usec = usec;
+ idt->start_time.tv_sec = sec;
+ idt->start_time.tv_usec = usec;
}
if (!conf.writer_given) {
g = setup_default_wng();
return NULL;
}
-static void wng_event_handler(struct task *t)
-{
- struct writer_node_group *g = t->private_data;
-
- PARA_INFO_LOG("%s\n", para_strerror(-t->ret));
- unregister_task(t);
- wng_close(g);
-}
-
-
-static void idt_event_handler(struct task *t)
-{
- int ret;
-
- PARA_INFO_LOG("%s\n", para_strerror(-t->ret));
- unregister_task(t);
- wng->buf = sit.buf;
- wng->loaded = &sit.loaded;
- wng->input_error = &sit.error;
- wng->task.event_handler = wng_event_handler;
- wng->channels = &cwt.channels;
- wng->samplerate = &cwt.samplerate;
- ret = wng_open(wng);
- if (ret < 0) {
- PARA_ERROR_LOG("%s\n", para_strerror(-ret));
- exit(EXIT_FAILURE);
- }
-}
-
-static void cwt_event_handler(struct task *t)
-{
- if (t->ret != -E_NO_WAV_HEADER && t->ret != -E_WAV_HEADER_SUCCESS) {
- PARA_ERROR_LOG("%s\n", para_strerror(-t->ret));
- exit(EXIT_FAILURE);
- }
- PARA_INFO_LOG("%s\n", para_strerror(-t->ret));
- unregister_task(t);
-// if (t->ret == -E_WAV_HEADER_SUCCESS) {
-// conf.channels_arg = cwt.channels;
-// conf.sample_rate_arg = cwt.sample_rate;
-// }
- idt.task.pre_select = initial_delay_pre_select;
- idt.task.private_data = &idt;
- idt.task.event_handler = idt_event_handler;
- sprintf(idt.task.status, "initial_delay");
- register_task(&idt.task);
-}
-
/**
* Para_write's main function.
*
{
int ret = -E_WRITE_SYNTAX;
struct sched s;
+ struct check_wav_task *cwt = &the_check_wav_task;
+ struct initial_delay_task *idt = &the_initial_delay_task;
write_cmdline_parser(argc, argv, &conf);
HANDLE_VERSION_FLAG("write", conf);
stdin_set_defaults(&sit);
if (conf.bufsize_given)
sit.bufsize = conf.bufsize_arg;
- sit.buf = para_malloc(sit.bufsize),
+ sit.buf = para_malloc(sit.bufsize);
+
+ wng->buf = sit.buf;
+ wng->loaded = &sit.loaded;
+ wng->input_error = &sit.task.error;
+
register_task(&sit.task);
- cwt.task.pre_select = check_wav_pre_select;
- cwt.task.private_data = &cwt;
- cwt.task.event_handler = cwt_event_handler;
- cwt.buf = sit.buf;
- cwt.loaded = &sit.loaded;
- cwt.error = &sit.error;
- sprintf(cwt.task.status, "check wav");
- register_task(&cwt.task);
+ cwt->buf = sit.buf;
+ cwt->loaded = &sit.loaded;
+ cwt->input_error = &sit.task.error;
+ sprintf(cwt->task.status, "check wav");
+ cwt->task.pre_select = check_wav_pre_select;
+ register_task(&cwt->task);
+
+ idt->task.pre_select = initial_delay_pre_select;
+ sprintf(idt->task.status, "initial_delay");
+ register_task(&idt->task);
- s.default_timeout.tv_sec = 1;
+ s.default_timeout.tv_sec = 10;
s.default_timeout.tv_usec = 0;
ret = schedule(&s);
-
+ wng_close(wng);
out:
if (ret < 0) {
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
- ret = EXIT_FAILURE;
- } else
- ret = EXIT_SUCCESS;
- return ret;
+ exit(EXIT_FAILURE);
+ }
+ exit(EXIT_SUCCESS);
}
size_t max_chunk_bytes;
/** Non-zero if an error or end of file was encountered by the feeding task. */
int *input_error;
- /** Non-zero if an error occurred or end of file was encountered. */
- int error;
/** current output buffer */
char *buf;
/** number of bytes loaded in the output buffer */
static void wng_pre_select(__a_unused struct sched *s, struct task *t)
{
- struct writer_node_group *g = t->private_data;
+ struct writer_node_group *g = container_of(t, struct writer_node_group, task);
int i;
FOR_EACH_WRITER_NODE(i, g) {
struct writer_node *wn = &g->writer_nodes[i];
- t->ret = wn->writer->pre_select(s, wn);
- if (t->ret < 0) {
- g->error = t->ret;
+ t->error = wn->writer->pre_select(s, wn);
+ if (t->error < 0)
return;
- }
}
}
static void wng_post_select(struct sched *s, struct task *t)
{
- struct writer_node_group *g = t->private_data;
+ struct writer_node_group *g = container_of(t, struct writer_node_group, task);
int i;
size_t min_written = 0;
FOR_EACH_WRITER_NODE(i, g) {
struct writer_node *wn = &g->writer_nodes[i];
- t->ret = wn->writer->post_select(s, wn);
- if (t->ret < 0) {
- g->error = t->ret;
+ t->error = wn->writer->post_select(s, wn);
+ if (t->error < 0)
return;
- }
if (!i)
min_written = wn->written;
else
min_written = PARA_MIN(min_written, wn->written);
}
-// PARA_INFO_LOG("loaded: %zd, min_written: %zd bytes\n", *g->loaded, min_written);
+ //PARA_INFO_LOG("loaded: %zd, min_written: %zd bytes\n", *g->loaded, min_written);
if (min_written) {
*g->loaded -= min_written;
FOR_EACH_WRITER_NODE(i, g)
g->writer_nodes[i].written -= min_written;
}
if (!*g->loaded && *g->input_error) {
- g->error = *g->input_error;
- t->ret = g->error;
+ t->error = *g->input_error;
return;
}
- t->ret = 1;
if (*g->loaded && min_written) {
// PARA_INFO_LOG("moving %zd bytes\n", *g->loaded);
memmove(g->buf, g->buf + min_written, *g->loaded);
g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
}
sprintf(g->task.status, "%s", "writer node group");
- g->error = 0;
return 1;
err_out:
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
wn->writer->close(wn);
}
g->num_writers = 0;
- g->error = ret;
return ret;
}
g->num_writers = num_writers;
g->writer_nodes = para_calloc(num_writers
* sizeof(struct writer_node));
- g->task.private_data = g;
g->task.post_select = wng_post_select;
g->task.pre_select = wng_pre_select;
return g;