]> git.tuebingen.mpg.de Git - paraslash.git/blob - buffer_tree.c
[btr] Add a comment to btr_pool_available().
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *name;
13         char *area_start;
14         char *area_end;
15         char *rhead;
16         char *whead;
17 };
18
19 enum btr_buffer_flags {
20         /* changes the way the buffer is deallocated */
21         BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25         char *buf;
26         size_t size;
27         /** The number of references to this buffer. */
28         int refcount;
29         struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33         struct btr_buffer *btrb;
34         size_t consumed;
35         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36         struct list_head node;
37         size_t wrap_count;
38 };
39
40 struct btr_node {
41         char *name;
42         struct btr_node *parent;
43         /* The position of this btr node in the buffer tree. */
44         struct list_head node;
45         /* The children nodes of this btr node are linked together in a list. */
46         struct list_head children;
47         /* Time of first data transfer. */
48         struct timeval start;
49         /**
50          * The input queue is a list of references to btr buffers. Each item on
51          * the list represents an input buffer which has not been completely
52          * used by this btr node.
53          */
54         struct list_head input_queue;
55         btr_command_handler execute;
56         void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61         struct btr_pool *btrp;
62
63         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64         btrp = para_malloc(sizeof(*btrp));
65         btrp->area_start = para_malloc(area_size);
66         btrp->area_end = btrp->area_start + area_size;
67         btrp->rhead = btrp->area_start;
68         btrp->whead = btrp->area_start;
69         btrp->name = para_strdup(name);
70         return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77         if (!btrp)
78                 return;
79         free(btrp->area_start);
80         free(btrp->name);
81         free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86         return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91         if (!btrp->whead)
92                 return btr_pool_size(btrp);
93         if (btrp->rhead <= btrp->whead)
94                 return  btrp->whead - btrp->rhead;
95         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100         return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 /*
104  * Return maximal size available for one read. This is
105  * smaller than the value returned by btr_pool_unused().
106  */
107 size_t btr_pool_available(struct btr_pool *btrp)
108 {
109         if (!btrp->whead)
110                 return 0;
111         if (btrp->rhead <= btrp->whead)
112                 return btrp->area_end - btrp->whead;
113         return btrp->rhead - btrp->whead;
114 }
115
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
117 {
118         if (result)
119                 *result = btrp->whead;
120         return btr_pool_available(btrp);
121 }
122
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
124 {
125         char *end;
126
127         if (size == 0)
128                 return;
129         //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
130         assert(size <= btr_pool_available(btrp));
131         end = btrp->whead + size;
132         assert(end <= btrp->area_end);
133
134         if (end == btrp->area_end) {
135                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
136                 end = btrp->area_start;
137         }
138         if (end == btrp->rhead) {
139                 PARA_DEBUG_LOG("btrp buffer full\n");
140                 end = NULL; /* buffer full */
141         }
142         btrp->whead = end;
143         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
144 }
145
146 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
147 {
148         char *end = btrp->rhead + size;
149
150         //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
151         if (size == 0)
152                 return;
153         assert(end <= btrp->area_end);
154         assert(size <= btr_pool_filled(btrp));
155         if (end == btrp->area_end)
156                 end = btrp->area_start;
157         if (!btrp->whead)
158                 btrp->whead = btrp->rhead;
159         btrp->rhead = end;
160         if (btrp->rhead == btrp->whead)
161                 btrp->rhead = btrp->whead = btrp->area_start;
162         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
163 }
164
165 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
166         &((_btrn)->children), node)
167 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
168         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
169
170 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
171         list_for_each_entry((_br), &(_btrn)->input_queue, node)
172 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
173         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
174
175 /*
176         (parent, child):
177         (NULL, NULL): new, isolated node.
178         (NULL, c): new node becomes root, c must be old root
179         (p, NULL): new leaf node
180         (p, c): new internal node, ch must be child of p
181
182 */
183 struct btr_node *btr_new_node(struct btr_node_description *bnd)
184 {
185         struct btr_node *btrn = para_malloc(sizeof(*btrn));
186
187         btrn->name = para_strdup(bnd->name);
188         btrn->parent = bnd->parent;
189         btrn->execute = bnd->handler;
190         btrn->context = bnd->context;
191         btrn->start.tv_sec = 0;
192         btrn->start.tv_usec = 0;
193         if (bnd->parent)
194                 list_add_tail(&btrn->node, &bnd->parent->children);
195         INIT_LIST_HEAD(&btrn->children);
196         INIT_LIST_HEAD(&btrn->input_queue);
197         if (bnd->parent)
198                 PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
199         else
200                 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
201         return btrn;
202 }
203
204 /*
205  * Allocate a new btr buffer.
206  *
207  * The freshly allocated buffer will have a zero refcount and will
208  * not be associated with a btr pool.
209  */
210 static struct btr_buffer *new_btrb(char *buf, size_t size)
211 {
212         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
213
214         btrb->buf = buf;
215         btrb->size = size;
216         return btrb;
217 }
218
219 static void dealloc_buffer(struct btr_buffer *btrb)
220 {
221         if (btrb->pool)
222                 btr_pool_deallocate(btrb->pool, btrb->size);
223         else
224                 free(btrb->buf);
225 }
226
227 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
228 {
229         if (list_empty(&btrn->input_queue))
230                 return NULL;
231         return list_first_entry(&btrn->input_queue,
232                 struct btr_buffer_reference, node);
233 }
234
235 /*
236  * Deallocate the reference, release the resources if refcount drops to zero.
237  */
238 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
239 {
240         struct btr_buffer *btrb = br->btrb;
241
242         //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
243         list_del(&br->node);
244         free(br);
245         btrb->refcount--;
246         if (btrb->refcount == 0) {
247                 dealloc_buffer(btrb);
248                 free(btrb);
249         }
250 }
251
252 static void add_btrb_to_children(struct btr_buffer *btrb,
253                 struct btr_node *btrn, size_t consumed)
254 {
255         struct btr_node *ch;
256
257         if (btrn->start.tv_sec == 0)
258                 btrn->start = *now;
259         FOR_EACH_CHILD(ch, btrn) {
260                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
261                 br->btrb = btrb;
262                 br->consumed = consumed;
263                 list_add_tail(&br->node, &ch->input_queue);
264                 btrb->refcount++;
265                 if (ch->start.tv_sec == 0)
266                         ch->start = *now;
267         }
268 }
269
270 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
271 {
272         struct btr_buffer *btrb;
273
274         assert(size != 0);
275         if (list_empty(&btrn->children)) {
276                 free(buf);
277                 return;
278         }
279         btrb = new_btrb(buf, size);
280         add_btrb_to_children(btrb, btrn, 0);
281 }
282
283 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
284                 struct btr_node *btrn)
285 {
286         struct btr_buffer *btrb;
287         char *buf;
288         size_t avail;
289
290         assert(size != 0);
291         if (list_empty(&btrn->children))
292                 return;
293         avail = btr_pool_get_buffer(btrp, &buf);
294         assert(avail >= size);
295         btr_pool_allocate(btrp, size);
296         btrb = new_btrb(buf, size);
297         btrb->pool = btrp;
298         add_btrb_to_children(btrb, btrn, 0);
299 }
300
301 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
302         struct btr_node *btrn)
303 {
304         char *buf;
305         size_t sz, copy;
306
307         if (n == 0)
308                 return;
309         assert(n <= btr_pool_unused(btrp));
310         sz = btr_pool_get_buffer(btrp, &buf);
311         copy = PARA_MIN(sz, n);
312         memcpy(buf, src, copy);
313         btr_add_output_pool(btrp, copy, btrn);
314         if (copy == n)
315                 return;
316         sz = btr_pool_get_buffer(btrp, &buf);
317         assert(sz >= n - copy);
318         memcpy(buf, src + copy, n - copy);
319         btr_add_output_pool(btrp, n - copy, btrn);
320 }
321
322 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
323 {
324         add_btrb_to_children(br->btrb, btrn, br->consumed);
325         btr_drop_buffer_reference(br);
326 }
327
328 void btr_pushdown(struct btr_node *btrn)
329 {
330         struct btr_buffer_reference *br, *tmp;
331
332         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
333                 btr_pushdown_br(br, btrn);
334 }
335
336 int btr_pushdown_one(struct btr_node *btrn)
337 {
338         struct btr_buffer_reference *br;
339
340         if (list_empty(&btrn->input_queue))
341                 return 0;
342         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
343         btr_pushdown_br(br, btrn);
344         return 1;
345 }
346
347 /* Return true if this node has no children. */
348 bool btr_no_children(struct btr_node *btrn)
349 {
350         return list_empty(&btrn->children);
351 }
352
353 bool btr_no_parent(struct btr_node *btrn)
354 {
355         return !btrn->parent;
356 }
357
358 bool btr_inplace_ok(struct btr_node *btrn)
359 {
360         if (!btrn->parent)
361                 return true;
362         return list_is_singular(&btrn->parent->children);
363 }
364
365 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
366 {
367         return br->btrb->size - br->consumed;
368 }
369
370 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
371 {
372         if (buf)
373                 *buf = br->btrb->buf + br->consumed;
374         return br_available_bytes(br);
375 }
376
377 /**
378  * \return zero if the input buffer queue is empty.
379  */
380 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
381 {
382         struct btr_buffer_reference *br;
383         char *buf, *result = NULL;
384         size_t sz, rv = 0;
385
386         FOR_EACH_BUFFER_REF(br, btrn) {
387                 sz = btr_get_buffer_by_reference(br, &buf);
388                 if (!result) {
389                         result = buf;
390                         rv = sz;
391                         if (!br->btrb->pool)
392                                 break;
393                         continue;
394                 }
395                 if (!br->btrb->pool)
396                         break;
397                 if (result + rv != buf) {
398                         PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
399                                 btrn->name, result + rv, buf);
400                         break;
401                 }
402 //              PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
403 //                      rv, sz, rv + sz);
404 //              PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
405 //                      result, sz);
406                 rv += sz;
407         }
408         if (bufp)
409                 *bufp = result;
410         return rv;
411 }
412
413 void btr_consume(struct btr_node *btrn, size_t numbytes)
414 {
415         struct btr_buffer_reference *br, *tmp;
416         size_t sz;
417
418         if (numbytes == 0)
419                 return;
420         br = get_first_input_br(btrn);
421         assert(br);
422
423         //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
424         if (br->wrap_count == 0) {
425                 /*
426                  * No wrap buffer. Drop buffer references whose buffer
427                  * has been fully used. */
428                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
429                         if (br->consumed + numbytes <= br->btrb->size) {
430                                 br->consumed += numbytes;
431                                 if (br->consumed == br->btrb->size)
432                                         btr_drop_buffer_reference(br);
433                                 return;
434                         }
435                         numbytes -= br->btrb->size - br->consumed;
436                         btr_drop_buffer_reference(br);
437                 }
438                 assert(true);
439         }
440         /*
441
442         We have a wrap buffer, consume from it. If in total,
443         i.e. including previous calls to brt_consume(), less than
444         wrap_count has been consumed, there's nothing more we can do.
445
446         Otherwise we drop the wrap buffer and consume from subsequent
447         buffers of the input queue the correct amount of bytes. This
448         is the total number of bytes that have been consumed from the
449         wrap buffer.
450 */
451         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
452                 br_available_bytes(br));
453
454         assert(numbytes <= br_available_bytes(br));
455         if (br->consumed + numbytes < br->wrap_count) {
456                 br->consumed += numbytes;
457                 return;
458         }
459         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
460         /* get rid of the wrap buffer */
461         sz = br->consumed + numbytes;
462         btr_drop_buffer_reference(br);
463         return btr_consume(btrn, sz);
464 }
465
466 static void flush_input_queue(struct btr_node *btrn)
467 {
468         struct btr_buffer_reference *br, *tmp;
469         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
470                 btr_drop_buffer_reference(br);
471 }
472
473 void btr_free_node(struct btr_node *btrn)
474 {
475         if (!btrn)
476                 return;
477         free(btrn->name);
478         free(btrn);
479 }
480
481 void btr_remove_node(struct btr_node *btrn)
482 {
483         struct btr_node *ch;
484
485         if (!btrn)
486                 return;
487         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
488         FOR_EACH_CHILD(ch, btrn)
489                 ch->parent = NULL;
490         flush_input_queue(btrn);
491         if (btrn->parent)
492                 list_del(&btrn->node);
493 }
494
495 size_t btr_get_input_queue_size(struct btr_node *btrn)
496 {
497         struct btr_buffer_reference *br;
498         size_t size = 0, wrap_consumed = 0;
499
500         FOR_EACH_BUFFER_REF(br, btrn) {
501                 if (br->wrap_count != 0) {
502                         wrap_consumed = br->consumed;
503                         continue;
504                 }
505                 size += br_available_bytes(br);
506         }
507         assert(wrap_consumed <= size);
508         size -= wrap_consumed;
509         return size;
510 }
511
512 void btr_splice_out_node(struct btr_node *btrn)
513 {
514         struct btr_node *ch, *tmp;
515
516         assert(btrn);
517         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
518         btr_pushdown(btrn);
519         if (btrn->parent)
520                 list_del(&btrn->node);
521         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
522                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
523                         btrn->parent? btrn->parent->name : "NULL");
524                 ch->parent = btrn->parent;
525                 if (btrn->parent)
526                         list_move(&ch->node, &btrn->parent->children);
527         }
528         assert(list_empty(&btrn->children));
529 }
530
531 /**
532  * Return the size of the largest input queue.
533  *
534  * Iterates over all children of the given node.
535  */
536 size_t btr_bytes_pending(struct btr_node *btrn)
537 {
538         size_t max_size = 0;
539         struct btr_node *ch;
540
541         FOR_EACH_CHILD(ch, btrn) {
542                 size_t size = btr_get_input_queue_size(ch);
543                 max_size = PARA_MAX(max_size, size);
544         }
545         return max_size;
546 }
547
548 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
549 {
550         if (!btrn)
551                 return -ERRNO_TO_PARA_ERROR(EINVAL);
552         if (!btrn->execute)
553                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
554         return btrn->execute(btrn, command, value_result);
555 }
556
557 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
558 {
559         int ret;
560
561         for (; btrn; btrn = btrn->parent) {
562                 struct btr_node *parent = btrn->parent;
563                 if (!parent)
564                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
565                 if (!parent->execute)
566                         continue;
567                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
568                 ret = parent->execute(parent, command, value_result);
569                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
570                         continue;
571                 if (ret < 0)
572                         return ret;
573                 if (value_result && *value_result)
574                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
575                                 *value_result);
576                 return 1;
577         }
578         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
579 }
580
581 void *btr_context(struct btr_node *btrn)
582 {
583         return btrn->context;
584 }
585
586 static bool need_buffer_pool_merge(struct btr_node *btrn)
587 {
588         struct btr_buffer_reference *br = get_first_input_br(btrn);
589
590         if (!br)
591                 return false;
592         if (br->wrap_count != 0)
593                 return true;
594         if (br->btrb->pool)
595                 return true;
596         return false;
597 }
598
599 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
600 {
601         struct btr_buffer_reference *br, *wbr = NULL;
602         int num_refs; /* including wrap buffer */
603         char *buf, *buf1 = NULL, *buf2 = NULL;
604         size_t sz, sz1 = 0, sz2 = 0, wsz;
605
606         br = get_first_input_br(btrn);
607         if (!br || br_available_bytes(br) >= dest_size)
608                 return;
609         num_refs = 0;
610         FOR_EACH_BUFFER_REF(br, btrn) {
611                 num_refs++;
612                 sz = btr_get_buffer_by_reference(br, &buf);
613                 if (sz == 0)
614                         break;
615                 if (br->wrap_count != 0) {
616                         assert(!wbr);
617                         assert(num_refs == 1);
618                         wbr = br;
619                         if (sz >= dest_size)
620                                 return;
621                         continue;
622                 }
623                 if (!buf1) {
624                         buf1 = buf;
625                         sz1 = sz;
626                         goto next;
627                 }
628                 if (buf1 + sz1 == buf) {
629                         sz1 += sz;
630                         goto next;
631                 }
632                 if (!buf2) {
633                         buf2 = buf;
634                         sz2 = sz;
635                         goto next;
636                 }
637                 assert(buf2 + sz2 == buf);
638                 sz2 += sz;
639 next:
640                 if (sz1 + sz2 >= dest_size)
641                         break;
642         }
643         if (!buf2) /* nothing to do */
644                 return;
645         assert(buf1 && sz2 > 0);
646         /*
647          * If the second buffer is large, we only take the first part of it to
648          * avoid having to memcpy() huge buffers.
649          */
650         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
651         if (!wbr) {
652                 /* Make a new wrap buffer combining buf1 and buf2. */
653                 sz = sz1 + sz2;
654                 buf = para_malloc(sz);
655                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
656                         buf1, sz1, buf2, sz2, buf, sz);
657                 memcpy(buf, buf1, sz1);
658                 memcpy(buf + sz1, buf2, sz2);
659                 br = para_calloc(sizeof(*br));
660                 br->btrb = new_btrb(buf, sz);
661                 br->btrb->refcount = 1;
662                 br->consumed = 0;
663                 /* This is a wrap buffer */
664                 br->wrap_count = sz1;
665                 para_list_add(&br->node, &btrn->input_queue);
666                 return;
667         }
668         PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
669         /*
670          * We already have a wrap buffer, but it is too small. It might be
671          * partially used.
672          */
673         wsz = br_available_bytes(wbr);
674         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
675                 return;
676         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
677         wbr->btrb->size += sz;
678         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
679         /* copy the new data to the end of the reallocated buffer */
680         assert(sz2 >= sz);
681         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
682 }
683
684 /**
685  * Merge the first two input buffers into one.
686  *
687  * This is a quite expensive operation.
688  *
689  * \return The number of buffers that have been available (zero, one or two).
690  */
691 static int merge_input(struct btr_node *btrn)
692 {
693         struct btr_buffer_reference *brs[2], *br;
694         char *bufs[2], *buf;
695         size_t szs[2], sz;
696         int i;
697
698         if (list_empty(&btrn->input_queue))
699                 return 0;
700         if (list_is_singular(&btrn->input_queue))
701                 return 1;
702         i = 0;
703         /* get references to the first two buffers */
704         FOR_EACH_BUFFER_REF(br, btrn) {
705                 brs[i] = br;
706                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
707                 i++;
708                 if (i == 2)
709                         break;
710         }
711         /* make a new btrb that combines the two buffers and a br to it. */
712         sz = szs[0] + szs[1];
713         buf = para_malloc(sz);
714         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
715                 btrn->name, szs[0], szs[1], sz);
716         memcpy(buf, bufs[0], szs[0]);
717         memcpy(buf + szs[0], bufs[1], szs[1]);
718
719         br = para_calloc(sizeof(*br));
720         br->btrb = new_btrb(buf, sz);
721         br->btrb->refcount = 1;
722
723         /* replace the first two refs by the new one */
724         btr_drop_buffer_reference(brs[0]);
725         btr_drop_buffer_reference(brs[1]);
726         para_list_add(&br->node, &btrn->input_queue);
727         return 2;
728 }
729
730 void btr_merge(struct btr_node *btrn, size_t dest_size)
731 {
732         if (need_buffer_pool_merge(btrn))
733                 return merge_input_pool(btrn, dest_size);
734         for (;;) {
735                 char *buf;
736                 size_t len = btr_next_buffer(btrn, &buf);
737                 if (len >= dest_size)
738                         return;
739                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
740                 if (merge_input(btrn) < 2)
741                         return;
742         }
743 }
744
745 bool btr_eof(struct btr_node *btrn)
746 {
747         char *buf;
748         size_t len = btr_next_buffer(btrn, &buf);
749
750         return (len == 0 && btr_no_parent(btrn));
751 }
752
753 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
754 {
755         struct btr_node *ch;
756         const char spaces[] = "                 ", *space = spaces + 16 - depth;
757
758         if (depth > 16)
759                 return;
760         para_log(loglevel, "%s%s\n", space, btrn->name);
761         FOR_EACH_CHILD(ch, btrn)
762                 log_tree_recursively(ch, loglevel, depth + 1);
763 }
764
765 void btr_log_tree(struct btr_node *btrn, int loglevel)
766 {
767         return log_tree_recursively(btrn, loglevel, 0);
768 }
769
770 /*
771  * \return \a root if \a name is \p NULL.
772  */
773 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
774 {
775         struct btr_node *ch;
776
777         if (!name)
778                 return root;
779         if (!strcmp(root->name, name))
780                 return root;
781         FOR_EACH_CHILD(ch, root) {
782                 struct btr_node *result = btr_search_node(name, ch);
783                 if (result)
784                         return result;
785         }
786         return NULL;
787 }
788
789 /** 640K ought to be enough for everybody ;) */
790 #define BTRN_MAX_PENDING (640 * 1024)
791
792 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
793         enum btr_node_type type)
794 {
795         size_t iqs;
796
797         assert(btrn);
798         if (type != BTR_NT_LEAF) {
799                 if (btr_no_children(btrn))
800                         return -E_BTR_NO_CHILD;
801                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
802                         return 0;
803         }
804         if (type != BTR_NT_ROOT) {
805                 if (btr_eof(btrn))
806                         return -E_BTR_EOF;
807                 iqs = btr_get_input_queue_size(btrn);
808                 if (iqs == 0) /* we have a parent, because not eof */
809                         return 0;
810                 if (iqs < min_iqs && !btr_no_parent(btrn))
811                         return 0;
812         }
813         return 1;
814 }
815
816 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
817 {
818         *tv = btrn->start;
819 }