Add btr pool support to the fecdec filter.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12 char *name;
13 char *area_start;
14 char *area_end;
15 char *rhead;
16 char *whead;
17 };
18
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
21 BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25 char *buf;
26 size_t size;
27 /** The number of references to this buffer. */
28 int refcount;
29 struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
34 size_t consumed;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
37 size_t wrap_count;
38 };
39
40 struct btr_node {
41 char *name;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
48 struct timeval start;
49 /**
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
53 */
54 struct list_head input_queue;
55 btr_command_handler execute;
56 void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61 struct btr_pool *btrp;
62
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
70 return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77 if (!btrp)
78 return;
79 free(btrp->area_start);
80 free(btrp->name);
81 free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86 return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91 if (!btrp->whead)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 size_t btr_pool_available(struct btr_pool *btrp)
104 {
105 if (!btrp->whead)
106 return 0;
107 if (btrp->rhead <= btrp->whead)
108 return btrp->area_end - btrp->whead;
109 return btrp->rhead - btrp->whead;
110 }
111
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
113 {
114 if (result)
115 *result = btrp->whead;
116 return btr_pool_available(btrp);
117 }
118
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 {
121 char *end;
122
123 if (size == 0)
124 return;
125 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126 assert(size <= btr_pool_available(btrp));
127 end = btrp->whead + size;
128 assert(end <= btrp->area_end);
129
130 if (end == btrp->area_end) {
131 PARA_DEBUG_LOG("end of pool area reached: %p\n", end);
132 end = btrp->area_start;
133 }
134 if (end == btrp->rhead) {
135 PARA_DEBUG_LOG("btrp buffer full\n");
136 end = NULL; /* buffer full */
137 }
138 btrp->whead = end;
139 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
140 }
141
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
143 {
144 char *end = btrp->rhead + size;
145
146 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
147 if (size == 0)
148 return;
149 assert(end <= btrp->area_end);
150 assert(size <= btr_pool_filled(btrp));
151 if (end == btrp->area_end)
152 end = btrp->area_start;
153 if (!btrp->whead)
154 btrp->whead = btrp->rhead;
155 btrp->rhead = end;
156 if (btrp->rhead == btrp->whead)
157 btrp->rhead = btrp->whead = btrp->area_start;
158 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
172 btr_command_handler handler, void *context)
173 {
174 struct btr_node *btrn = para_malloc(sizeof(*btrn));
175
176 btrn->name = para_strdup(name);
177 btrn->parent = parent;
178 btrn->execute = handler;
179 btrn->context = context;
180 btrn->start.tv_sec = 0;
181 btrn->start.tv_usec = 0;
182 if (parent)
183 list_add_tail(&btrn->node, &parent->children);
184 INIT_LIST_HEAD(&btrn->children);
185 INIT_LIST_HEAD(&btrn->input_queue);
186 if (parent)
187 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
188 else
189 PARA_INFO_LOG("added %s as btr root\n", name);
190 return btrn;
191 }
192
193 /*
194 * Allocate a new btr buffer.
195 *
196 * The freshly allocated buffer will have a zero refcount.
197 */
198 static struct btr_buffer *new_btrb(char *buf, size_t size)
199 {
200 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
201
202 btrb->buf = buf;
203 btrb->size = size;
204 return btrb;
205 }
206
207 static void dealloc_buffer(struct btr_buffer *btrb)
208 {
209 if (btrb->pool)
210 btr_pool_deallocate(btrb->pool, btrb->size);
211 else
212 free(btrb->buf);
213 }
214
215 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
216 {
217 if (list_empty(&btrn->input_queue))
218 return NULL;
219 return list_first_entry(&btrn->input_queue,
220 struct btr_buffer_reference, node);
221 }
222
223 /*
224 * Deallocate the reference, release the resources if refcount drops to zero.
225 */
226 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
227 {
228 struct btr_buffer *btrb = br->btrb;
229
230 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
231 list_del(&br->node);
232 free(br);
233 btrb->refcount--;
234 if (btrb->refcount == 0) {
235 dealloc_buffer(btrb);
236 free(btrb);
237 }
238 }
239
240 static void add_btrb_to_children(struct btr_buffer *btrb,
241 struct btr_node *btrn, size_t consumed)
242 {
243 struct btr_node *ch;
244
245 if (btrn->start.tv_sec == 0)
246 btrn->start = *now;
247 FOR_EACH_CHILD(ch, btrn) {
248 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
249 br->btrb = btrb;
250 br->consumed = consumed;
251 list_add_tail(&br->node, &ch->input_queue);
252 btrb->refcount++;
253 if (ch->start.tv_sec == 0)
254 ch->start = *now;
255 }
256 }
257
258 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
259 {
260 struct btr_buffer *btrb;
261
262 assert(size != 0);
263 if (list_empty(&btrn->children)) {
264 free(buf);
265 return;
266 }
267 btrb = new_btrb(buf, size);
268 add_btrb_to_children(btrb, btrn, 0);
269 }
270
271 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
272 struct btr_node *btrn)
273 {
274 struct btr_buffer *btrb;
275 char *buf;
276 size_t avail;
277
278 assert(size != 0);
279 if (list_empty(&btrn->children))
280 return;
281 avail = btr_pool_get_buffer(btrp, &buf);
282 assert(avail >= size);
283 btr_pool_allocate(btrp, size);
284 btrb = new_btrb(buf, size);
285 btrb->pool = btrp;
286 add_btrb_to_children(btrb, btrn, 0);
287 }
288
289 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
290 struct btr_node *btrn)
291 {
292 char *buf;
293 size_t sz, copy;
294
295 if (n == 0)
296 return;
297 assert(n <= btr_pool_unused(btrp));
298 sz = btr_pool_get_buffer(btrp, &buf);
299 copy = PARA_MIN(sz, n);
300 memcpy(buf, src, copy);
301 btr_add_output_pool(btrp, copy, btrn);
302 if (copy == n)
303 return;
304 sz = btr_pool_get_buffer(btrp, &buf);
305 assert(sz >= n - copy);
306 memcpy(buf, src + copy, n - copy);
307 btr_add_output_pool(btrp, n - copy, btrn);
308 }
309
310 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
311 {
312 add_btrb_to_children(br->btrb, btrn, br->consumed);
313 btr_drop_buffer_reference(br);
314 }
315
316 void btr_pushdown(struct btr_node *btrn)
317 {
318 struct btr_buffer_reference *br, *tmp;
319
320 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
321 btr_pushdown_br(br, btrn);
322 }
323
324 int btr_pushdown_one(struct btr_node *btrn)
325 {
326 struct btr_buffer_reference *br;
327
328 if (list_empty(&btrn->input_queue))
329 return 0;
330 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
331 btr_pushdown_br(br, btrn);
332 return 1;
333 }
334
335 /* Return true if this node has no children. */
336 bool btr_no_children(struct btr_node *btrn)
337 {
338 return list_empty(&btrn->children);
339 }
340
341 bool btr_no_parent(struct btr_node *btrn)
342 {
343 return !btrn->parent;
344 }
345
346 bool btr_inplace_ok(struct btr_node *btrn)
347 {
348 if (!btrn->parent)
349 return true;
350 return list_is_singular(&btrn->parent->children);
351 }
352
353 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
354 {
355 return br->btrb->size - br->consumed;
356 }
357
358 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
359 {
360 if (buf)
361 *buf = br->btrb->buf + br->consumed;
362 return br_available_bytes(br);
363 }
364
365 /**
366 * \return zero if the input buffer queue is empty.
367 */
368 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
369 {
370 struct btr_buffer_reference *br;
371 char *buf, *result = NULL;
372 size_t sz, rv = 0;
373
374 FOR_EACH_BUFFER_REF(br, btrn) {
375 sz = btr_get_buffer_by_reference(br, &buf);
376 if (!result) {
377 result = buf;
378 rv = sz;
379 if (!br->btrb->pool)
380 break;
381 continue;
382 }
383 if (!br->btrb->pool)
384 break;
385 if (result + rv != buf) {
386 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
387 btrn->name, result + rv, buf);
388 break;
389 }
390 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
391 // rv, sz, rv + sz);
392 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
393 // result, sz);
394 rv += sz;
395 }
396 if (bufp)
397 *bufp = result;
398 return rv;
399 }
400
401 void btr_consume(struct btr_node *btrn, size_t numbytes)
402 {
403 struct btr_buffer_reference *br, *tmp;
404 size_t sz;
405
406 if (numbytes == 0)
407 return;
408 br = get_first_input_br(btrn);
409 assert(br);
410
411 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
412 if (br->wrap_count == 0) {
413 /*
414 * No wrap buffer. Drop buffer references whose buffer
415 * has been fully used. */
416 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
417 if (br->consumed + numbytes <= br->btrb->size) {
418 br->consumed += numbytes;
419 if (br->consumed == br->btrb->size)
420 btr_drop_buffer_reference(br);
421 return;
422 }
423 numbytes -= br->btrb->size - br->consumed;
424 btr_drop_buffer_reference(br);
425 }
426 assert(true);
427 }
428 /*
429
430 We have a wrap buffer, consume from it. If in total,
431 i.e. including previous calls to brt_consume(), less than
432 wrap_count has been consumed, there's nothing more we can do.
433
434 Otherwise we drop the wrap buffer and consume from subsequent
435 buffers of the input queue the correct amount of bytes. This
436 is the total number of bytes that have been consumed from the
437 wrap buffer.
438 */
439 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
440 br_available_bytes(br));
441
442 assert(numbytes <= br_available_bytes(br));
443 if (br->consumed + numbytes < br->wrap_count) {
444 br->consumed += numbytes;
445 return;
446 }
447 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
448 /* get rid of the wrap buffer */
449 sz = br->consumed + numbytes;
450 btr_drop_buffer_reference(br);
451 return btr_consume(btrn, sz);
452 }
453
454 static void flush_input_queue(struct btr_node *btrn)
455 {
456 struct btr_buffer_reference *br, *tmp;
457 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
458 btr_drop_buffer_reference(br);
459 }
460
461 void btr_free_node(struct btr_node *btrn)
462 {
463 if (!btrn)
464 return;
465 free(btrn->name);
466 free(btrn);
467 }
468
469 void btr_remove_node(struct btr_node *btrn)
470 {
471 struct btr_node *ch;
472
473 if (!btrn)
474 return;
475 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
476 FOR_EACH_CHILD(ch, btrn)
477 ch->parent = NULL;
478 flush_input_queue(btrn);
479 if (btrn->parent)
480 list_del(&btrn->node);
481 }
482
483 size_t btr_get_input_queue_size(struct btr_node *btrn)
484 {
485 struct btr_buffer_reference *br;
486 size_t size = 0;
487
488 FOR_EACH_BUFFER_REF(br, btrn) {
489 //PARA_CRIT_LOG("size: %zu\n", size);
490 size += br_available_bytes(br);
491 }
492 return size;
493 }
494
495 void btr_splice_out_node(struct btr_node *btrn)
496 {
497 struct btr_node *ch, *tmp;
498
499 assert(btrn);
500 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
501 btr_pushdown(btrn);
502 if (btrn->parent)
503 list_del(&btrn->node);
504 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
505 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
506 btrn->parent? btrn->parent->name : "NULL");
507 ch->parent = btrn->parent;
508 if (btrn->parent)
509 list_move(&ch->node, &btrn->parent->children);
510 }
511 assert(list_empty(&btrn->children));
512 }
513
514 /**
515 * Return the size of the largest input queue.
516 *
517 * Iterates over all children of the given node.
518 */
519 size_t btr_bytes_pending(struct btr_node *btrn)
520 {
521 size_t max_size = 0;
522 struct btr_node *ch;
523
524 FOR_EACH_CHILD(ch, btrn) {
525 size_t size = btr_get_input_queue_size(ch);
526 max_size = PARA_MAX(max_size, size);
527 }
528 return max_size;
529 }
530
531 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
532 {
533 if (!btrn)
534 return -ERRNO_TO_PARA_ERROR(EINVAL);
535 if (!btrn->execute)
536 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
537 return btrn->execute(btrn, command, value_result);
538 }
539
540 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
541 {
542 int ret;
543
544 for (; btrn; btrn = btrn->parent) {
545 struct btr_node *parent = btrn->parent;
546 if (!parent)
547 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
548 if (!parent->execute)
549 continue;
550 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
551 ret = parent->execute(parent, command, value_result);
552 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
553 continue;
554 if (ret < 0)
555 return ret;
556 if (value_result && *value_result)
557 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
558 *value_result);
559 return 1;
560 }
561 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
562 }
563
564 void *btr_context(struct btr_node *btrn)
565 {
566 return btrn->context;
567 }
568
569 static bool need_buffer_pool_merge(struct btr_node *btrn)
570 {
571 struct btr_buffer_reference *br = get_first_input_br(btrn);
572
573 if (!br)
574 return false;
575 if (br->wrap_count != 0)
576 return true;
577 if (br->btrb->pool)
578 return true;
579 return false;
580 }
581
582 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
583 {
584 struct btr_buffer_reference *br, *wbr;
585 int num_refs; /* including wrap buffer */
586 char *buf, *buf1, *buf2 = NULL;
587 size_t sz, sz1, sz2 = 0, wsz;
588
589 if (list_empty(&btrn->input_queue))
590 return;
591
592 num_refs = 0;
593 FOR_EACH_BUFFER_REF(br, btrn) {
594 num_refs++;
595 sz = btr_get_buffer_by_reference(br, &buf);
596 if (br->wrap_count != 0) {
597 assert(!wbr);
598 assert(num_refs == 1);
599 wbr = br;
600 if (sz >= dest_size)
601 return;
602 continue;
603 }
604 if (!buf1) {
605 buf1 = buf;
606 sz1 = sz;
607 goto next;
608 }
609 if (buf1 + sz1 == buf) {
610 sz1 += sz;
611 goto next;
612 }
613 if (!buf2) {
614 buf2 = buf;
615 sz2 = sz;
616 goto next;
617 }
618 assert(buf2 + sz2 == buf);
619 sz2 += sz;
620 next:
621 if (sz1 + sz2 >= dest_size)
622 break;
623 }
624 if (!wbr) {
625 assert(buf1);
626 if (!buf2) /* nothing to do */
627 return;
628 /* make a new wrap buffer combining buf1 and buf 2. */
629 sz = sz1 + sz2;
630 buf = para_malloc(sz);
631 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
632 buf1, sz1, buf2, sz2, buf, sz);
633 memcpy(buf, buf1, sz1);
634 memcpy(buf + sz1, buf2, sz2);
635 br = para_calloc(sizeof(*br));
636 br->btrb = new_btrb(buf, sz);
637 br->btrb->refcount = 1;
638 br->consumed = 0;
639 /* This is a wrap buffer */
640 br->wrap_count = sz1;
641 para_list_add(&br->node, &btrn->input_queue);
642 return;
643 }
644 /*
645 * We already have a wrap buffer, but it is too small. It might be
646 * partially used.
647 */
648 wsz = br_available_bytes(wbr);
649 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
650 return;
651 assert(buf1 && buf2);
652 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
653 wbr->btrb->size += sz;
654 PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
655 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
656 /* copy the new data to the end of the reallocated buffer */
657 assert(sz2 >= sz);
658 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
659 }
660
661 /**
662 * Merge the first two input buffers into one.
663 *
664 * This is a quite expensive operation.
665 *
666 * \return The number of buffers that have been available (zero, one or two).
667 */
668 static int merge_input(struct btr_node *btrn)
669 {
670 struct btr_buffer_reference *brs[2], *br;
671 char *bufs[2], *buf;
672 size_t szs[2], sz;
673 int i;
674
675 if (list_empty(&btrn->input_queue))
676 return 0;
677 if (list_is_singular(&btrn->input_queue))
678 return 1;
679 i = 0;
680 /* get references to the first two buffers */
681 FOR_EACH_BUFFER_REF(br, btrn) {
682 brs[i] = br;
683 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
684 i++;
685 if (i == 2)
686 break;
687 }
688 /* make a new btrb that combines the two buffers and a br to it. */
689 sz = szs[0] + szs[1];
690 buf = para_malloc(sz);
691 PARA_DEBUG_LOG("memory merging input buffers: (%zu, %zu) -> %zu\n",
692 szs[0], szs[1], sz);
693 memcpy(buf, bufs[0], szs[0]);
694 memcpy(buf + szs[0], bufs[1], szs[1]);
695
696 br = para_calloc(sizeof(*br));
697 br->btrb = new_btrb(buf, sz);
698 br->btrb->refcount = 1;
699
700 /* replace the first two refs by the new one */
701 btr_drop_buffer_reference(brs[0]);
702 btr_drop_buffer_reference(brs[1]);
703 para_list_add(&br->node, &btrn->input_queue);
704 return 2;
705 }
706
707 void btr_merge(struct btr_node *btrn, size_t dest_size)
708 {
709 if (need_buffer_pool_merge(btrn))
710 return merge_input_pool(btrn, dest_size);
711 for (;;) {
712 char *buf;
713 size_t len = btr_next_buffer(btrn, &buf);
714 if (len >= dest_size)
715 return;
716 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
717 if (merge_input(btrn) < 2)
718 return;
719 }
720 }
721
722 bool btr_eof(struct btr_node *btrn)
723 {
724 char *buf;
725 size_t len = btr_next_buffer(btrn, &buf);
726
727 return (len == 0 && btr_no_parent(btrn));
728 }
729
730 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
731 {
732 struct btr_node *ch;
733 const char spaces[] = " ", *space = spaces + 16 - depth;
734
735 if (depth > 16)
736 return;
737 para_log(loglevel, "%s%s\n", space, btrn->name);
738 FOR_EACH_CHILD(ch, btrn)
739 log_tree_recursively(ch, loglevel, depth + 1);
740 }
741
742 void btr_log_tree(struct btr_node *btrn, int loglevel)
743 {
744 return log_tree_recursively(btrn, loglevel, 0);
745 }
746
747 /** 640K ought to be enough for everybody ;) */
748 #define BTRN_MAX_PENDING (640 * 1024)
749
750 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
751 enum btr_node_type type)
752 {
753 size_t iqs;
754
755 if (type != BTR_NT_LEAF) {
756 if (btr_no_children(btrn))
757 return -E_BTR_NO_CHILD;
758 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
759 return 0;
760 }
761 if (type != BTR_NT_ROOT) {
762 if (btr_eof(btrn))
763 return -E_BTR_EOF;
764 iqs = btr_get_input_queue_size(btrn);
765 if (iqs == 0) /* we have a parent, because not eof */
766 return 0;
767 if (iqs < min_iqs && !btr_no_parent(btrn))
768 return 0;
769 }
770 return 1;
771 }
772
773 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
774 {
775 *tv = btrn->start;
776 }