afs.cmd: Update help output.
[paraslash.git] / osl.c
1 /*
2  * Copyright (C) 2007 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file osl.c Object storage layer functions. */
8 #include <dirent.h> /* readdir() */
9 #include <assert.h>
10
11
12 #include "para.h"
13 #include "error.h"
14 #include "fd.h"
15 #include "list.h"
16 #include "osl_core.h"
17 /**
18  * A wrapper for lseek(2).
19  *
20  * \param fd The filedescriptor whose offset is to be to repositioned.
21  * \param offset A value-result parameter.
22  * \param whence Usual repositioning directive.
23  *
24  * Reposition the offset of the file descriptor \a fd to the argument \a offset
25  * according to the directive \a whence. Upon successful return, \a offset
26  * contains the resulting offset location as measured in bytes from the
27  * beginning of the file.
28  *
29  * \return Positive on success. Otherwise, the function returns \p -E_LSEEK.
30  *
31  * \sa lseek(2).
32  */
33 int para_lseek(int fd, off_t *offset, int whence)
34 {
35         *offset = lseek(fd, *offset, whence);
36         int ret = -E_LSEEK;
37         if (*offset == -1)
38                 return ret;
39         return 1;
40 }
41
42 /**
43  * Wrapper for the write system call.
44  *
45  * \param fd The file descriptor to write to.
46  * \param buf The buffer to write.
47  * \param size The length of \a buf in bytes.
48  *
49  * This function writes out the given bufffer and retries if an interrupt
50  * occured during the write.
51  *
52  * \return On success, the number of bytes written is returned, otherwise, the
53  * function returns \p -E_WRITE.
54  *
55  * \sa write(2).
56  */
57 ssize_t para_write(int fd, const void *buf, size_t size)
58 {
59         ssize_t ret;
60
61         for (;;) {
62                 ret = write(fd, buf, size);
63                 if ((ret < 0) && (errno == EAGAIN || errno == EINTR))
64                         continue;
65                 return ret >= 0? ret : -E_WRITE;
66         }
67 }
68
69 /**
70  * Write the whole buffer to a file descriptor.
71  *
72  * \param fd The file descriptor to write to.
73  * \param buf The buffer to write.
74  * \param size The length of \a buf in bytes.
75  *
76  * This function writes the given buffer and continues on short writes and
77  * when interrupted by a signal.
78  *
79  * \return Positive on success, negative on errors. Possible errors: any
80  * errors returned by para_write().
81  *
82  * \sa para_write().
83  */
84 ssize_t para_write_all(int fd, const void *buf, size_t size)
85 {
86 //      PARA_DEBUG_LOG("writing %zu bytes\n", size);
87         const char *b = buf;
88         while (size) {
89                 ssize_t ret = para_write(fd, b, size);
90 //              PARA_DEBUG_LOG("ret: %zd\n", ret);
91                 if (ret < 0)
92                         return ret;
93                 b += ret;
94                 size -= ret;
95         }
96         return 1;
97 }
98 /**
99  * Open a file, write the given buffer and close the file.
100  *
101  * \param filename Full path to the file to open.
102  * \param buf The buffer to write to the file.
103  * \param size The size of \a buf.
104  *
105  * \return Positive on success, negative on errors. Possible errors include:
106  * any errors from para_open() or para_write().
107  *
108  * \sa para_open(), para_write().
109  */
110 int para_write_file(const char *filename, const void *buf, size_t size)
111 {
112         int ret, fd;
113
114         ret = para_open(filename, O_WRONLY | O_CREAT | O_EXCL, 0644);
115         if (ret < 0)
116                 return ret;
117         fd = ret;
118         ret = para_write_all(fd, buf, size);
119         if (ret < 0)
120                 goto out;
121         ret = 1;
122 out:
123         close(fd);
124         return ret;
125 }
126
127 static int append_file(const char *filename, char *header, size_t header_size,
128         char *data, size_t data_size, uint32_t *new_pos)
129 {
130         int ret, fd;
131
132 //      PARA_DEBUG_LOG("appending %zu  + %zu bytes\n", header_size, data_size);
133         ret = para_open(filename, O_WRONLY | O_CREAT | O_APPEND, 0644);
134         if (ret < 0)
135                 return ret;
136         fd = ret;
137         if (header && header_size) {
138                 ret = para_write_all(fd, header, header_size);
139                 if (ret < 0)
140                         goto out;
141         }
142         ret = para_write_all(fd, data, data_size);
143         if (ret < 0)
144                 goto out;
145         if (new_pos) {
146                 off_t offset = 0;
147                 ret = para_lseek(fd, &offset, SEEK_END);
148                 if (ret < 0)
149                         goto out;
150 //              PARA_DEBUG_LOG("new file size: " FMT_OFF_T "\n", offset);
151                 *new_pos = offset;
152         }
153         ret = 1;
154 out:
155         close(fd);
156         return ret;
157 }
158
159 /**
160  * Map a file into memory.
161  *
162  * \param path Name of the regular file to map.
163  * \param open_mode Either \p O_RDONLY or \p O_RDWR.
164  * \param obj On success, the mapping is returned here.
165  *
166  * \return Positive on success, negative on errors. Possible errors include: \p
167  * E_FSTAT, any errors returned by para_open(), \p E_EMPTY, \p E_MMAP.
168  *
169  * \sa para_open(), mmap(2).
170  */
171 int mmap_full_file(const char *path, int open_mode, struct osl_object *obj)
172 {
173         int fd, ret, mmap_prot, mmap_flags;
174         struct stat file_status;
175
176         if (open_mode == O_RDONLY) {
177                 mmap_prot = PROT_READ;
178                 mmap_flags = MAP_PRIVATE;
179         } else {
180                 mmap_prot = PROT_READ | PROT_WRITE;
181                 mmap_flags = MAP_SHARED;
182         }
183         ret = para_open(path, open_mode, 0);
184         if (ret < 0)
185                 return ret;
186         fd = ret;
187         ret = -E_FSTAT;
188         if (fstat(fd, &file_status) < 0)
189                 goto out;
190         obj->size = file_status.st_size;
191         ret = -E_EMPTY;
192         PARA_DEBUG_LOG("%s: size %zu\n", path, obj->size);
193         if (!obj->size)
194                 goto out;
195         obj->data = mmap(NULL, obj->size, mmap_prot, mmap_flags, fd, 0);
196         if (obj->data == MAP_FAILED) {
197                 obj->data = NULL;
198                 ret = -E_MMAP;
199                 goto out;
200         }
201         ret = 1;
202 out:
203         close(fd);
204         return ret;
205 }
206
207 /**
208  * Traverse the given directory recursively.
209  *
210  * \param dirname The directory to traverse.
211  * \param func The function to call for each entry.
212  * \param private_data Pointer to an arbitrary data structure.
213  *
214  * For each regular file  in \a dirname, the supplied function \a func is
215  * called.  The full path of the regular file and the \a private_data pointer
216  * are passed to \a func.
217  *
218  * \return On success, 1 is returned. Otherwise, this function returns a
219  * negative value which indicates the kind of the error.
220  */
221 int for_each_file_in_dir(const char *dirname,
222                 int (*func)(const char *, const void *), const void *private_data)
223 {
224         DIR *dir;
225         struct dirent *entry;
226         int cwd_fd, ret2, ret = para_opendir(dirname, &dir, &cwd_fd);
227
228         if (ret < 0)
229                 return ret;
230         /* scan cwd recursively */
231         while ((entry = readdir(dir))) {
232                 mode_t m;
233                 char *tmp;
234                 struct stat s;
235
236                 if (!strcmp(entry->d_name, "."))
237                         continue;
238                 if (!strcmp(entry->d_name, ".."))
239                         continue;
240                 if (lstat(entry->d_name, &s) == -1)
241                         continue;
242                 m = s.st_mode;
243                 if (!S_ISREG(m) && !S_ISDIR(m))
244                         continue;
245                 tmp = make_message("%s/%s", dirname, entry->d_name);
246                 if (!S_ISDIR(m)) {
247                         ret = func(tmp, private_data);
248                         free(tmp);
249                         if (ret < 0)
250                                 goto out;
251                         continue;
252                 }
253                 /* directory */
254                 ret = for_each_file_in_dir(tmp, func, private_data);
255                 free(tmp);
256                 if (ret < 0)
257                         goto out;
258         }
259         ret = 1;
260 out:
261         closedir(dir);
262         ret2 = para_fchdir(cwd_fd);
263         if (ret2 < 0 && ret >= 0)
264                 ret = ret2;
265         close(cwd_fd);
266         return ret;
267 }
268
269 static int verify_name(const char *name)
270 {
271         if (!name)
272                 return -E_BAD_NAME;
273         if (!*name)
274                 return -E_BAD_NAME;
275         if (strchr(name, '/'))
276                 return -E_BAD_NAME;
277         if (!strcmp(name, ".."))
278                 return -E_BAD_NAME;
279         if (!strcmp(name, "."))
280                 return -E_BAD_NAME;
281         return 1;
282 }
283
284 /**
285  * Compare two osl objects pointing to unsigned integers of 32 bit size.
286  *
287  * \param obj1 Pointer to the first integer.
288  * \param obj2 Pointer to the second integer.
289  *
290  * \return The values required for an osl compare function.
291  *
292  * \sa osl_compare_func, osl_hash_compare().
293  */
294 int uint32_compare(const struct osl_object *obj1, const struct osl_object *obj2)
295 {
296         uint32_t d1 = read_u32((const char *)obj1->data);
297         uint32_t d2 = read_u32((const char *)obj2->data);
298
299         if (d1 < d2)
300                 return 1;
301         if (d1 > d2)
302                 return -1;
303         return 0;
304 }
305
306 /**
307  * Compare two osl objects pointing to hash values.
308  *
309  * \param obj1 Pointer to the first hash object.
310  * \param obj2 Pointer to the second hash object.
311  *
312  * \return The values required for an osl compare function.
313  *
314  * \sa osl_compare_func, uint32_compare().
315  */
316 int osl_hash_compare(const struct osl_object *obj1, const struct osl_object *obj2)
317 {
318         return hash_compare((HASH_TYPE *)obj1->data, (HASH_TYPE *)obj2->data);
319 }
320
321 static char *disk_storage_dirname(const struct osl_table *t, unsigned col_num,
322                 const char *ds_name)
323 {
324         char *dirname, *column_name = column_filename(t, col_num);
325
326         if (!(t->desc->flags & OSL_LARGE_TABLE))
327                 return column_name;
328         dirname = make_message("%s/%.2s", column_name, ds_name);
329         free(column_name);
330         return dirname;
331 }
332
333 static char *disk_storage_name_of_object(const struct osl_table *t,
334         const struct osl_object *obj)
335 {
336         HASH_TYPE hash[HASH_SIZE];
337         hash_object(obj, hash);
338         return disk_storage_name_of_hash(t, hash);
339 }
340
341 static int disk_storage_name_of_row(const struct osl_table *t,
342                 const struct osl_row *row, char **name)
343 {
344         struct osl_object obj;
345         int ret = osl_get_object(t, row, t->disk_storage_name_column, &obj);
346
347         if (ret < 0)
348                 return ret;
349         *name = disk_storage_name_of_object(t, &obj);
350         return 1;
351 }
352
353 static void column_name_hash(const char *col_name, HASH_TYPE *hash)
354 {
355         return hash_function(col_name, strlen(col_name), hash);
356 }
357
358 static int init_column_descriptions(struct osl_table *t)
359 {
360         int i, j, ret;
361         const struct osl_column_description *cd;
362
363         ret = -E_BAD_TABLE_DESC;
364         ret = verify_name(t->desc->name);
365         if (ret < 0)
366                 goto err;
367         ret = -E_BAD_DB_DIR;
368         if (!t->desc->dir)
369                 goto err;
370         /* the size of the index header without column descriptions */
371         t->index_header_size = IDX_COLUMN_DESCRIPTIONS;
372         FOR_EACH_COLUMN(i, t->desc, cd) {
373                 struct osl_column *col = t->columns + i;
374                 if (cd->storage_flags & OSL_RBTREE) {
375                         if (!cd->compare_function)
376                                 return -E_NO_COMPARE_FUNC;
377                 }
378                 if (cd->storage_type == OSL_NO_STORAGE)
379                         continue;
380                 ret = -E_NO_COLUMN_NAME;
381                 if (!cd->name || !cd->name[0])
382                         goto err;
383                 ret = verify_name(cd->name);
384                 if (ret < 0)
385                         goto err;
386                 t->index_header_size += index_column_description_size(cd->name);
387                 column_name_hash(cd->name, col->name_hash);
388                 ret = -E_DUPLICATE_COL_NAME;
389                 for (j = i + 1; j < t->desc->num_columns; j++) {
390                         const char *name2 = get_column_description(t->desc,
391                                 j)->name;
392                         if (cd->name && name2 && !strcmp(cd->name, name2))
393                                 goto err;
394                 }
395         }
396         return 1;
397 err:
398         return ret;
399 }
400
401 /**
402  * Initialize a struct table from given table description.
403  *
404  * \param desc The description of the osl table.
405  * \param table_ptr Result is returned here.
406  *
407  * This function performs several sanity checks on \p desc and returns if any
408  * of these tests fail. On success, a struct \p osl_table is allocated and
409  * initialized with data derived from \p desc.
410  *
411  * \return Positive on success, negative on errors. Possible errors include: \p
412  * E_BAD_TABLE_DESC, \p E_NO_COLUMN_DESC, \p E_NO_COLUMNS, \p
413  * E_BAD_STORAGE_TYPE, \p E_BAD_STORAGE_FLAGS, \p E_BAD_STORAGE_SIZE, \p
414  * E_NO_UNIQUE_RBTREE_COLUMN, \p E_NO_RBTREE_COL.
415  *
416  * \sa struct osl_table.
417  */
418 int init_table_structure(const struct osl_table_description *desc,
419                 struct osl_table **table_ptr)
420 {
421         const struct osl_column_description *cd;
422         struct osl_table *t = para_calloc(sizeof(*t));
423         int i, ret = -E_BAD_TABLE_DESC, have_disk_storage_name_column = 0;
424
425         if (!desc)
426                 goto err;
427         PARA_DEBUG_LOG("creating table structure for '%s' from table "
428                 "description\n", desc->name);
429         ret = -E_NO_COLUMN_DESC;
430         if (!desc->column_descriptions)
431                 goto err;
432         ret = -E_NO_COLUMNS;
433         if (!desc->num_columns)
434                 goto err;
435         t->columns = para_calloc(desc->num_columns * sizeof(struct osl_column));
436         t->desc = desc;
437         FOR_EACH_COLUMN(i, t->desc, cd) {
438                 enum osl_storage_type st = cd->storage_type;
439                 enum osl_storage_flags sf = cd->storage_flags;
440                 struct osl_column *col = &t->columns[i];
441
442                 ret = -E_BAD_STORAGE_TYPE;
443                 if (st != OSL_MAPPED_STORAGE && st != OSL_DISK_STORAGE
444                                 && st != OSL_NO_STORAGE)
445                         goto err;
446                 ret = -E_BAD_STORAGE_FLAGS;
447                 if (st == OSL_DISK_STORAGE && sf & OSL_RBTREE)
448                         goto err;
449                 ret = -E_BAD_STORAGE_SIZE;
450                 if (sf & OSL_FIXED_SIZE && !cd->data_size)
451                         goto err;
452                 switch (st) {
453                 case OSL_DISK_STORAGE:
454                         t->num_disk_storage_columns++;
455                         break;
456                 case OSL_MAPPED_STORAGE:
457                         t->num_mapped_columns++;
458                         col->index_offset = t->row_index_size;
459                         t->row_index_size += 8;
460                         break;
461                 case OSL_NO_STORAGE:
462                         col->volatile_num = t->num_volatile_columns;
463                         t->num_volatile_columns++;
464                         break;
465                 }
466                 if (sf & OSL_RBTREE) {
467                         col->rbtree_num = t->num_rbtrees;
468                         t->num_rbtrees++;
469                         if ((sf & OSL_UNIQUE) && (st == OSL_MAPPED_STORAGE)) {
470                                 if (!have_disk_storage_name_column)
471                                         t->disk_storage_name_column = i;
472                                 have_disk_storage_name_column = 1;
473                         }
474                 }
475         }
476         ret = -E_NO_UNIQUE_RBTREE_COLUMN;
477         if (t->num_disk_storage_columns && !have_disk_storage_name_column)
478                 goto err;
479         ret = -E_NO_RBTREE_COL;
480         if (!t->num_rbtrees)
481                 goto err;
482         /* success */
483         PARA_DEBUG_LOG("OK. Index entry size: %u\n", t->row_index_size);
484         ret = init_column_descriptions(t);
485         if (ret < 0)
486                 goto err;
487         *table_ptr = t;
488         return 1;
489 err:
490         free(t->columns);
491         free(t);
492         return ret;
493 }
494
495 /**
496  * Read the table description from index header.
497  *
498  * \param map The memory mapping of the index file.
499  * \param desc The values found in the index header are returned here.
500  *
501  * Read the index header, check for the paraslash magic string and the table version number.
502  * Read all information stored in the index header into \a desc.
503  *
504  * \return Positive on success, negative on errors.
505  *
506  * \sa struct osl_table_description, osl_create_table.
507  */
508 int read_table_desc(struct osl_object *map, struct osl_table_description *desc)
509 {
510         char *buf = map->data;
511         uint8_t version;
512         uint16_t header_size;
513         int ret, i;
514         unsigned offset;
515         struct osl_column_description *cd;
516
517         if (map->size < MIN_INDEX_HEADER_SIZE(1))
518                 return -E_SHORT_TABLE;
519         if (strncmp(buf + IDX_PARA_MAGIC, PARA_MAGIC, strlen(PARA_MAGIC)))
520                 return -E_NO_MAGIC;
521         version = read_u8(buf + IDX_VERSION);
522         if (version < MIN_TABLE_VERSION || version > MAX_TABLE_VERSION)
523                 return -E_VERSION_MISMATCH;
524         desc->num_columns = read_u8(buf + IDX_TABLE_FLAGS);
525         desc->flags = read_u8(buf + IDX_TABLE_FLAGS);
526         desc->num_columns = read_u16(buf + IDX_NUM_COLUMNS);
527         PARA_DEBUG_LOG("%u columns\n", desc->num_columns);
528         if (!desc->num_columns)
529                 return -E_NO_COLUMNS;
530         header_size = read_u16(buf + IDX_HEADER_SIZE);
531         if (map->size < header_size)
532                 return -E_BAD_SIZE;
533         desc->column_descriptions = para_calloc(desc->num_columns
534                 * sizeof(struct osl_column_description));
535         offset = IDX_COLUMN_DESCRIPTIONS;
536         FOR_EACH_COLUMN(i, desc, cd) {
537                 char *null_byte;
538
539                 ret = -E_SHORT_TABLE;
540                 if (map->size < offset + MIN_IDX_COLUMN_DESCRIPTION_SIZE) {
541                         PARA_ERROR_LOG("map size = %zu < %u = offset + min desc size\n",
542                                 map->size, offset + MIN_IDX_COLUMN_DESCRIPTION_SIZE);
543                         goto err;
544                 }
545                 cd->storage_type = read_u16(buf + offset + IDX_CD_STORAGE_TYPE);
546                 cd->storage_flags = read_u16(buf + offset +
547                         IDX_CD_STORAGE_FLAGS);
548                 cd->data_size = read_u32(buf + offset + IDX_CD_DATA_SIZE);
549                 null_byte = memchr(buf + offset + IDX_CD_NAME, '\0',
550                         map->size - offset - IDX_CD_NAME);
551                 ret = -E_INDEX_CORRUPTION;
552                 if (!null_byte)
553                         goto err;
554                 cd->name = para_strdup(buf + offset + IDX_CD_NAME);
555                 offset += index_column_description_size(cd->name);
556         }
557         if (offset != header_size) {
558                 ret = -E_INDEX_CORRUPTION;
559                 PARA_ERROR_LOG("real header size = %u != %u = stored header size\n",
560                         offset, header_size);
561                 goto err;
562         }
563         return 1;
564 err:
565         FOR_EACH_COLUMN(i, desc, cd)
566                 free(cd->name);
567         return ret;
568 }
569
570 /*
571  * check whether the table description given by \p t->desc matches the on-disk
572  * table structure stored in the index of \a t.
573  */
574 static int compare_table_descriptions(struct osl_table *t)
575 {
576         int i, ret;
577         struct osl_table_description desc;
578         const struct osl_column_description *cd1, *cd2;
579
580         /* read the on-disk structure into desc */
581         ret = read_table_desc(&t->index_map, &desc);
582         if (ret < 0)
583                 return ret;
584         ret = -E_BAD_TABLE_FLAGS;
585         if (desc.flags != t->desc->flags)
586                 goto out;
587         ret = -E_BAD_COLUMN_NUM;
588         if (desc.num_columns != t->desc->num_columns)
589                 goto out;
590         FOR_EACH_COLUMN(i, t->desc, cd1) {
591                 cd2 = get_column_description(&desc, i);
592                 ret = -E_BAD_STORAGE_TYPE;
593                 if (cd1->storage_type != cd2->storage_type)
594                         goto out;
595                 ret = -E_BAD_STORAGE_FLAGS;
596                 if (cd1->storage_flags != cd2->storage_flags) {
597                         PARA_ERROR_LOG("sf1 = %u != %u = sf2\n",
598                                 cd1->storage_flags, cd2->storage_flags);
599                         goto out;
600                 }
601                 ret = -E_BAD_DATA_SIZE;
602                 if (cd1->storage_flags & OSL_FIXED_SIZE)
603                         if (cd1->data_size != cd2->data_size)
604                                 goto out;
605                 ret = -E_BAD_COLUMN_NAME;
606                 if (strcmp(cd1->name, cd2->name))
607                         goto out;
608         }
609         PARA_DEBUG_LOG("table description of '%s' matches on-disk data, good\n",
610                 t->desc->name);
611         ret = 1;
612 out:
613         FOR_EACH_COLUMN(i, &desc, cd1)
614                 free(cd1->name);
615         free(desc.column_descriptions);
616         return ret;
617 }
618
619 static int create_table_index(struct osl_table *t)
620 {
621         char *buf, *filename;
622         int i, ret;
623         size_t size = t->index_header_size;
624         const struct osl_column_description *cd;
625         unsigned offset;
626
627         PARA_INFO_LOG("creating %zu byte index for table %s\n", size,
628                 t->desc->name);
629         buf = para_calloc(size);
630         sprintf(buf + IDX_PARA_MAGIC, "%s", PARA_MAGIC);
631         write_u8(buf + IDX_TABLE_FLAGS, t->desc->flags);
632         write_u8(buf + IDX_DIRTY_FLAG, 0);
633         write_u8(buf + IDX_VERSION, CURRENT_TABLE_VERSION);
634         write_u16(buf + IDX_NUM_COLUMNS, t->desc->num_columns);
635         write_u16(buf + IDX_HEADER_SIZE, t->index_header_size);
636         offset = IDX_COLUMN_DESCRIPTIONS;
637         FOR_EACH_COLUMN(i, t->desc, cd) {
638                 write_u16(buf + offset + IDX_CD_STORAGE_TYPE,
639                         cd->storage_type);
640                 write_u16(buf + offset + IDX_CD_STORAGE_FLAGS,
641                         cd->storage_flags);
642                 if (cd->storage_flags & OSL_FIXED_SIZE)
643                         write_u32(buf + offset + IDX_CD_DATA_SIZE,
644                                 cd->data_size);
645                 strcpy(buf + offset + IDX_CD_NAME, cd->name);
646                 offset += index_column_description_size(cd->name);
647         }
648         assert(offset = size);
649         filename = index_filename(t->desc);
650         ret = para_write_file(filename, buf, size);
651         free(buf);
652         free(filename);
653         return ret;
654 }
655
656 /**
657  * Create a new osl table.
658  *
659  * \param desc Pointer to the table description.
660  *
661  * \return Positive on success, negative on errors. Possible errors include: \p
662  * E_BAD_TABLE_DESC, \p E_BAD_DB_DIR, \p E_BAD_NAME, \p E_NO_COMPARE_FUNC, \p
663  * E_NO_COLUMN_NAME, \p E_DUPLICATE_COL_NAME, \p E_MKDIR, any errors returned
664  * by para_open().
665  */
666 int osl_create_table(const struct osl_table_description *desc)
667 {
668         const struct osl_column_description *cd;
669         char *table_dir = NULL, *filename;
670         struct osl_table *t;
671         int i, ret = init_table_structure(desc, &t);
672
673         if (ret < 0)
674                 return ret;
675         PARA_INFO_LOG("creating %s\n", desc->name);
676         FOR_EACH_COLUMN(i, t->desc, cd) {
677                 if (cd->storage_type == OSL_NO_STORAGE)
678                         continue;
679                 if (!table_dir) {
680                         ret = para_mkdir(desc->dir, 0777);
681                         if (ret < 0 && ret != -E_EXIST)
682                                 goto out;
683                         table_dir = make_message("%s/%s", desc->dir,
684                                 desc->name);
685                         ret = para_mkdir(table_dir, 0777);
686                         if (ret < 0)
687                                 goto out;
688                 }
689                 filename = column_filename(t, i);
690                 PARA_INFO_LOG("filename: %s\n", filename);
691                 if (cd->storage_type == OSL_MAPPED_STORAGE) {
692                         ret = para_open(filename, O_RDWR | O_CREAT | O_EXCL,
693                                 0644);
694                         free(filename);
695                         if (ret < 0)
696                                 goto out;
697                         close(ret);
698                         continue;
699                 }
700                 /* DISK STORAGE */
701                 ret = para_mkdir(filename, 0777);
702                 free(filename);
703                 if (ret < 0)
704                         goto out;
705         }
706         if (t->num_mapped_columns) {
707                 ret = create_table_index(t);
708                 if (ret < 0)
709                         goto out;
710         }
711         ret = 1;
712 out:
713         free(table_dir);
714         free(t->columns);
715         free(t);
716         return ret;
717 }
718
719 static int table_is_dirty(struct osl_table *t)
720 {
721         char *buf = (char *)t->index_map.data + IDX_DIRTY_FLAG;
722         uint8_t dirty = read_u8(buf) & 0x1;
723         return !!dirty;
724 }
725
726 static void mark_table_dirty(struct osl_table *t)
727 {
728         char *buf = (char *)t->index_map.data + IDX_DIRTY_FLAG;
729         write_u8(buf, read_u8(buf) | 1);
730 }
731
732 static void mark_table_clean(struct osl_table *t)
733 {
734         char *buf = (char *)t->index_map.data + IDX_DIRTY_FLAG;
735         write_u8(buf, read_u8(buf) & 0xfe);
736 }
737
738 static void unmap_column(struct osl_table *t, unsigned col_num)
739 {
740         struct osl_object map = t->columns[col_num].data_map;
741         int ret;
742         if (!map.data)
743                 return;
744         ret = para_munmap(map.data, map.size);
745         assert(ret > 0);
746         map.data = NULL;
747 }
748
749 /**
750  * Unmap all mapped files of an osl table.
751  *
752  * \param t Pointer to a mapped table.
753  * \param flags Options for unmapping.
754  *
755  * \return Positive on success, negative on errors.
756  *
757  * \sa map_table(), enum osl_close_flags, para_munmap().
758  */
759 int unmap_table(struct osl_table *t, enum osl_close_flags flags)
760 {
761         unsigned i;
762         const struct osl_column_description *cd;
763         int ret;
764
765         if (!t->num_mapped_columns) /* can this ever happen? */
766                 return 1;
767         PARA_DEBUG_LOG("unmapping table '%s'\n", t->desc->name);
768         if (!t->index_map.data)
769                 return -E_NOT_MAPPED;
770         if (flags & OSL_MARK_CLEAN)
771                 mark_table_clean(t);
772         ret = para_munmap(t->index_map.data, t->index_map.size);
773         if (ret < 0)
774                 return ret;
775         t->index_map.data = NULL;
776         if (!t->num_rows)
777                 return 1;
778         FOR_EACH_MAPPED_COLUMN(i, t, cd)
779                 unmap_column(t, i);
780         return 1;
781 }
782
783 static int map_column(struct osl_table *t, unsigned col_num)
784 {
785         struct stat statbuf;
786         char *filename = column_filename(t, col_num);
787         int ret = -E_STAT;
788         if (stat(filename, &statbuf) < 0) {
789                 free(filename);
790                 return ret;
791         }
792         if (!(S_IFREG & statbuf.st_mode)) {
793                 free(filename);
794                 return ret;
795         }
796         ret = mmap_full_file(filename, O_RDWR,
797                 &t->columns[col_num].data_map);
798         free(filename);
799         return ret;
800 }
801
802 /**
803  * Map the index file and all columns of type \p OSL_MAPPED_STORAGE into memory.
804  *
805  * \param t Pointer to an initialized table structure.
806  * \param flags Mapping options.
807  *
808  * \return Negative return value on errors; on success the number of rows
809  * (including invalid rows) is returned.
810  *
811  * \sa unmap_table(), enum map_table_flags, osl_open_table(), mmap(2).
812  */
813 int map_table(struct osl_table *t, enum map_table_flags flags)
814 {
815         char *filename;
816         const struct osl_column_description *cd;
817         int i = 0, ret, num_rows = 0;
818
819         if (!t->num_mapped_columns)
820                 return 0;
821         if (t->index_map.data)
822                 return -E_ALREADY_MAPPED;
823         filename = index_filename(t->desc);
824         PARA_DEBUG_LOG("mapping table '%s' (index: %s)\n", t->desc->name, filename);
825         ret = mmap_full_file(filename, flags & MAP_TBL_FL_MAP_RDONLY?
826                 O_RDONLY : O_RDWR, &t->index_map);
827         free(filename);
828         if (ret < 0)
829                 return ret;
830         if (flags & MAP_TBL_FL_VERIFY_INDEX) {
831                 ret = compare_table_descriptions(t);
832                 if (ret < 0)
833                         goto err;
834         }
835         ret = -E_BUSY;
836         if (!(flags & MAP_TBL_FL_IGNORE_DIRTY)) {
837                 if (table_is_dirty(t)) {
838                         PARA_ERROR_LOG("%s is dirty\n", t->desc->name);
839                         goto err;
840                 }
841         }
842         mark_table_dirty(t);
843         num_rows = table_num_rows(t);
844         if (!num_rows)
845                 return num_rows;
846         /* map data files */
847         FOR_EACH_MAPPED_COLUMN(i, t, cd) {
848                 ret = map_column(t, i);
849                 if (ret < 0)
850                         goto err;
851         }
852         return num_rows;
853 err:    /* unmap what is already mapped */
854         for (i--; i >= 0; i--) {
855                 struct osl_object map = t->columns[i].data_map;
856                 para_munmap(map.data, map.size);
857                 map.data = NULL;
858         }
859         para_munmap(t->index_map.data, t->index_map.size);
860         t->index_map.data = NULL;
861         return ret;
862 }
863
864 /**
865  * Retrieve a mapped object by row and column number.
866  *
867  * \param t Pointer to an open osl table.
868  * \param col_num Number of the mapped column containing the object to retrieve.
869  * \param row_num Number of the row containing the object to retrieve.
870  * \param obj The result is returned here.
871  *
872  * It is considered an error if \a col_num does not refer to a column
873  * of storage type \p OSL_MAPPED_STORAGE.
874  *
875  * \return Positive on success, negative on errors. Possible errors include:
876  * \p E_BAD_ROW_NUM, \p E_INVALID_OBJECT.
877  *
878  * \sa osl_storage_type.
879  */
880 int get_mapped_object(const struct osl_table *t, unsigned col_num,
881         uint32_t row_num, struct osl_object *obj)
882 {
883         struct osl_column *col = &t->columns[col_num];
884         uint32_t offset;
885         char *header;
886         char *cell_index;
887         int ret;
888
889         if (t->num_rows <= row_num)
890                 return -E_BAD_ROW_NUM;
891         ret = get_cell_index(t, row_num, col_num, &cell_index);
892         if (ret < 0)
893                 return ret;
894         offset = read_u32(cell_index);
895         obj->size = read_u32(cell_index + 4) - 1;
896         header = col->data_map.data + offset;
897         obj->data = header + 1;
898         if (read_u8(header) == 0xff) {
899                 PARA_ERROR_LOG("col %u, size %zu, offset %u\n", col_num,
900                         obj->size, offset);
901                 return -E_INVALID_OBJECT;
902         }
903         return 1;
904 }
905
906 static int search_rbtree(const struct osl_object *obj,
907                 const struct osl_table *t, unsigned col_num,
908                 struct rb_node **result, struct rb_node ***rb_link)
909 {
910         struct osl_column *col = &t->columns[col_num];
911         struct rb_node **new = &col->rbtree.rb_node, *parent = NULL;
912         const struct osl_column_description *cd =
913                 get_column_description(t->desc, col_num);
914         enum osl_storage_type st = cd->storage_type;
915         while (*new) {
916                 struct osl_row *this_row = get_row_pointer(*new,
917                         col->rbtree_num);
918                 int ret;
919                 struct osl_object this_obj;
920                 parent = *new;
921                 if (st == OSL_MAPPED_STORAGE) {
922                         ret = get_mapped_object(t, col_num, this_row->num,
923                                 &this_obj);
924                         if (ret < 0)
925                                 return ret;
926                 } else
927                         this_obj = this_row->volatile_objects[col->volatile_num];
928                 ret = cd->compare_function(obj, &this_obj);
929                 if (!ret) {
930                         if (result)
931                                 *result = get_rb_node_pointer(this_row,
932                                         col->rbtree_num);
933                         return 1;
934                 }
935                 if (ret < 0)
936                         new = &((*new)->rb_left);
937                 else
938                         new = &((*new)->rb_right);
939         }
940         if (result)
941                 *result = parent;
942         if (rb_link)
943                 *rb_link = new;
944         return -E_RB_KEY_NOT_FOUND;
945 }
946
947 static int insert_rbtree(struct osl_table *t, unsigned col_num,
948         const struct osl_row *row, const struct osl_object *obj)
949 {
950         struct rb_node *parent, **rb_link;
951         unsigned rbtree_num;
952         struct rb_node *n;
953         int ret = search_rbtree(obj, t, col_num, &parent, &rb_link);
954
955         if (ret > 0)
956                 return -E_RB_KEY_EXISTS;
957         rbtree_num = t->columns[col_num].rbtree_num;
958         n = get_rb_node_pointer(row, rbtree_num);
959         rb_link_node(n, parent, rb_link);
960         rb_insert_color(n, &t->columns[col_num].rbtree);
961         return 1;
962 }
963
964 static void remove_rb_node(struct osl_table *t, unsigned col_num,
965                 const struct osl_row *row)
966 {
967         struct osl_column *col = &t->columns[col_num];
968         const struct osl_column_description *cd =
969                 get_column_description(t->desc, col_num);
970         enum osl_storage_flags sf = cd->storage_flags;
971         struct rb_node *victim, *splice_out_node, *tmp;
972         if (!(sf & OSL_RBTREE))
973                 return;
974         /*
975          * Which node is removed/spliced out actually depends on how many
976          * children the victim node has: If it has no children, it gets
977          * deleted. If it has one child, it gets spliced out. If it has two
978          * children, its successor (which has at most a right child) gets
979          * spliced out.
980          */
981         victim = get_rb_node_pointer(row, col->rbtree_num);
982         if (victim->rb_left && victim->rb_right)
983                 splice_out_node = rb_next(victim);
984         else
985                 splice_out_node = victim;
986         /* Go up to the root and decrement the size of each node in the path. */
987         for (tmp = splice_out_node; tmp; tmp = rb_parent(tmp))
988                 tmp->size--;
989         rb_erase(victim, &col->rbtree);
990 }
991
992 static int add_row_to_rbtrees(struct osl_table *t, uint32_t row_num,
993                 struct osl_object *volatile_objs, struct osl_row **row_ptr)
994 {
995         unsigned i;
996         int ret;
997         struct osl_row *row = allocate_row(t->num_rbtrees);
998         const struct osl_column_description *cd;
999
1000         row->num = row_num;
1001         row->volatile_objects = volatile_objs;
1002         FOR_EACH_RBTREE_COLUMN(i, t, cd) {
1003                 if (cd->storage_type == OSL_MAPPED_STORAGE) {
1004                         struct osl_object obj;
1005                         ret = get_mapped_object(t, i, row_num, &obj);
1006                         if (ret < 0)
1007                                 goto err;
1008                         ret = insert_rbtree(t, i, row, &obj);
1009                 } else { /* volatile */
1010                         const struct osl_object *obj
1011                                 = volatile_objs + t->columns[i].volatile_num;
1012                         ret = insert_rbtree(t, i, row, obj);
1013                 }
1014                 if (ret < 0)
1015                         goto err;
1016         }
1017         if (row_ptr)
1018                 *row_ptr = row;
1019         return 1;
1020 err: /* rollback changes, i.e. remove added entries from rbtrees */
1021         while (i)
1022                 remove_rb_node(t, i--, row);
1023         free(row);
1024         return ret;
1025 }
1026
1027 static void free_volatile_objects(const struct osl_table *t,
1028                 enum osl_close_flags flags)
1029 {
1030         int i, j;
1031         struct rb_node *n;
1032         struct osl_column *rb_col;
1033         const struct osl_column_description *cd;
1034
1035         if (!t->num_volatile_columns)
1036                 return;
1037         /* find the first rbtree column (any will do) */
1038         FOR_EACH_RBTREE_COLUMN(i, t, cd)
1039                 break;
1040         rb_col = t->columns + i;
1041         /* walk that rbtree and free all volatile objects */
1042         for (n = rb_first(&rb_col->rbtree); n; n = rb_next(n)) {
1043                 struct osl_row *r = get_row_pointer(n, rb_col->rbtree_num);
1044                 if (flags & OSL_FREE_VOLATILE)
1045                         for (j = 0; j < t->num_volatile_columns; j++)
1046                                 free(r->volatile_objects[j].data);
1047                 free(r->volatile_objects);
1048         }
1049 }
1050
1051 /**
1052  * Erase all rbtree nodes and free resources.
1053  *
1054  * \param t Pointer to an open osl table.
1055  *
1056  * This function is called by osl_close_table().
1057  */
1058 void clear_rbtrees(struct osl_table *t)
1059 {
1060         const struct osl_column_description *cd;
1061         unsigned i, rbtrees_cleared = 0;
1062
1063         FOR_EACH_RBTREE_COLUMN(i, t, cd) {
1064                 struct osl_column *col = &t->columns[i];
1065                 struct rb_node *n;
1066                 rbtrees_cleared++;
1067                 for (n = rb_first(&col->rbtree); n;) {
1068                         struct osl_row *r;
1069                         rb_erase(n, &col->rbtree);
1070                         if (rbtrees_cleared == t->num_rbtrees) {
1071                                 r = get_row_pointer(n, col->rbtree_num);
1072                                 n = rb_next(n);
1073                                 free(r);
1074                         } else
1075                                 n = rb_next(n);
1076                 }
1077         }
1078
1079 }
1080
1081 /**
1082  * Close an osl table.
1083  *
1084  * \param t Pointer to the table to be closed.
1085  * \param flags Options for what should be cleaned up.
1086  *
1087  * If osl_open_table() succeeds, the resulting table pointer must later be
1088  * passed to this function in order to flush all changes to the filesystem and
1089  * to free the resources that were allocated by osl_open_table().
1090  *
1091  * \return Positive on success, negative on errors. Possible errors: \p E_BAD_TABLE,
1092  * errors returned by unmap_table().
1093  *
1094  * \sa osl_open_table(), unmap_table().
1095  */
1096 int osl_close_table(struct osl_table *t, enum osl_close_flags flags)
1097 {
1098         int ret;
1099
1100         if (!t)
1101                 return -E_BAD_TABLE;
1102         free_volatile_objects(t, flags);
1103         clear_rbtrees(t);
1104         ret = unmap_table(t, flags);
1105         if (ret < 0)
1106                 PARA_ERROR_LOG("unmap_table failed: %d\n", ret);
1107         free(t->columns);
1108         free(t);
1109         return ret;
1110 }
1111
1112 /**
1113  * Find out whether the given row number corresponds to an invalid row.
1114  *
1115  * \param t Pointer to the osl table.
1116  * \param row_num The number of the row in question.
1117  *
1118  * By definition, a row is considered invalid if all its index entries
1119  * are invalid.
1120  *
1121  * \return Positive if \a row_num corresponds to an invalid row,
1122  * zero if it corresponds to a valid row, negative on errors.
1123  */
1124 int row_is_invalid(struct osl_table *t, uint32_t row_num)
1125 {
1126         char *row_index;
1127         int i, ret = get_row_index(t, row_num, &row_index);
1128
1129         if (ret < 0)
1130                 return ret;
1131         for (i = 0; i < t->row_index_size; i++) {
1132                 if ((unsigned char)row_index[i] != 0xff)
1133                         return 0;
1134         }
1135         PARA_INFO_LOG("row %d is invalid\n", row_num);
1136         return 1;
1137 }
1138
1139 /**
1140  * Invalidate a row of an osl table.
1141  *
1142  * \param t Pointer to an open osl table.
1143  * \param row_num Number of the row to mark as invalid.
1144  *
1145  * This function marks each mapped object in the index entry of \a row as
1146  * invalid.
1147  *
1148  * \return Positive on success, negative on errors.
1149  */
1150 int mark_row_invalid(struct osl_table *t, uint32_t row_num)
1151 {
1152         char *row_index;
1153         int ret = get_row_index(t, row_num, &row_index);
1154
1155         if (ret < 0)
1156                 return ret;
1157         PARA_INFO_LOG("marking row %d as invalid\n", row_num);
1158         memset(row_index, 0xff, t->row_index_size);
1159         return 1;
1160 }
1161
1162 /**
1163  * Initialize all rbtrees and compute number of invalid rows.
1164  *
1165  * \param t The table containing the rbtrees to be initialized.
1166  *
1167  * \return Positive on success, negative on errors.
1168  */
1169 int init_rbtrees(struct osl_table *t)
1170 {
1171         int i, ret;
1172         const struct osl_column_description *cd;
1173
1174         /* create rbtrees */
1175         FOR_EACH_RBTREE_COLUMN(i, t, cd)
1176                 t->columns[i].rbtree = RB_ROOT;
1177         /* add valid rows to rbtrees */
1178         t->num_invalid_rows = 0;
1179         for (i = 0; i < t->num_rows; i++) {
1180                 ret = row_is_invalid(t, i);
1181                 if (ret < 0)
1182                         return ret;
1183                 if (ret) {
1184                         t->num_invalid_rows++;
1185                         continue;
1186                 }
1187                 ret = add_row_to_rbtrees(t, i, NULL, NULL);
1188                 if (ret < 0)
1189                         return ret;
1190         }
1191         return 1;
1192 }
1193
1194 /**
1195  * Open an osl table.
1196  *
1197  * Each osl table must be opened before its data can be accessed.
1198  *
1199  * \param table_desc Describes the table to be opened.
1200  * \param result Contains a pointer to the open table on success.
1201  *
1202  * The table description given by \a desc should coincide with the
1203  * description used at creation time.
1204  *
1205  * \return Positive on success, negative on errors. Possible errors include:
1206  * errors returned by init_table_structure(), \p E_NOENT, \p E_STAT, \p \p
1207  * E_NOTDIR, \p E_BAD_TABLE_DESC, \p E_BAD_DB_DIR, \p E_NO_COMPARE_FUNC, \p
1208  * E_NO_COLUMN_NAME, errors returned by init_rbtrees().
1209  */
1210 int osl_open_table(const struct osl_table_description *table_desc,
1211                 struct osl_table **result)
1212 {
1213         int i, ret;
1214         struct osl_table *t;
1215         const struct osl_column_description *cd;
1216
1217         PARA_INFO_LOG("opening table %s\n", table_desc->name);
1218         ret = init_table_structure(table_desc, &t);
1219         if (ret < 0)
1220                 return ret;
1221         FOR_EACH_DISK_STORAGE_COLUMN(i, t, cd) {
1222                 /* check if directory exists */
1223                 char *dirname = column_filename(t, i);
1224                 struct stat statbuf;
1225                 ret = stat(dirname, &statbuf);
1226                 free(dirname);
1227                 if (ret < 0) {
1228                         if (errno == ENOENT)
1229                                 ret = -E_NOENT;
1230                         else
1231                                 ret = -E_STAT;
1232                         goto err;
1233                 }
1234                 ret = -E_NOTDIR;
1235                 if (!S_ISDIR(statbuf.st_mode))
1236                         goto err;
1237         }
1238         ret = map_table(t, MAP_TBL_FL_VERIFY_INDEX);
1239         if (ret < 0)
1240                 goto err;
1241         t->num_rows = ret;
1242         PARA_DEBUG_LOG("num rows: %d\n", t->num_rows);
1243         ret = init_rbtrees(t);
1244         if (ret < 0) {
1245                 osl_close_table(t, OSL_MARK_CLEAN); /* ignore further errors */
1246                 return ret;
1247         }
1248         *result = t;
1249         return 1;
1250 err:
1251         free(t->columns);
1252         free(t);
1253         return ret;
1254 }
1255
1256 static int create_disk_storage_object_dir(const struct osl_table *t,
1257                 unsigned col_num, const char *ds_name)
1258 {
1259         char *dirname;
1260         int ret;
1261
1262         if (!(t->desc->flags & OSL_LARGE_TABLE))
1263                 return 1;
1264         dirname = disk_storage_dirname(t, col_num, ds_name);
1265         ret = para_mkdir(dirname, 0777);
1266         free(dirname);
1267         if (ret < 0 && ret != -E_EXIST)
1268                 return ret;
1269         return 1;
1270 }
1271
1272 static int write_disk_storage_file(const struct osl_table *t, unsigned col_num,
1273         const struct osl_object *obj, const char *ds_name)
1274 {
1275         int ret;
1276         char *filename;
1277
1278         ret = create_disk_storage_object_dir(t, col_num, ds_name);
1279         if (ret < 0)
1280                 return ret;
1281         filename = disk_storage_path(t, col_num, ds_name);
1282         ret = para_write_file(filename, obj->data, obj->size);
1283         free(filename);
1284         return ret;
1285 }
1286
1287 static int append_map_file(const struct osl_table *t, unsigned col_num,
1288         const struct osl_object *obj, uint32_t *new_size)
1289 {
1290         char *filename = column_filename(t, col_num);
1291         int ret;
1292         char header = 0; /* zero means valid object */
1293
1294 //      PARA_DEBUG_LOG("appending %zu + 1 byte\n", obj->size);
1295         ret = append_file(filename, &header, 1, obj->data, obj->size,
1296                 new_size);
1297         free(filename);
1298         return ret;
1299 }
1300
1301 static int append_row_index(const struct osl_table *t, char *row_index)
1302 {
1303         char *filename;
1304         int ret;
1305
1306         if (!t->num_mapped_columns)
1307                 return 1;
1308         filename = index_filename(t->desc);
1309         ret = append_file(filename, NULL, 0, row_index,
1310                 t->row_index_size, NULL);
1311         free(filename);
1312         return ret;
1313 }
1314
1315 /**
1316  * A wrapper for truncate(2)
1317  *
1318  * \param path Name of the regular file to truncate
1319  * \param size Number of bytes to \b shave \b off
1320  *
1321  * Truncate the regular file named by \a path by \a size bytes.
1322  *
1323  * \return Positive on success, negative on errors. Possible errors include: \p
1324  * E_STAT, \p E_BAD_SIZE, \p E_TRUNC.
1325  *
1326  * \sa truncate(2)
1327  */
1328 int para_truncate(const char *path, off_t size)
1329 {
1330         int ret;
1331         struct stat statbuf;
1332
1333         ret = -E_STAT;
1334         if (stat(path, &statbuf) < 0)
1335                 goto out;
1336         ret = -E_BAD_SIZE;
1337         if (statbuf.st_size < size)
1338                 goto out;
1339         ret = -E_TRUNC;
1340         if (truncate(path, statbuf.st_size - size) < 0)
1341                 goto out;
1342         ret = 1;
1343 out:
1344         return ret;
1345 }
1346
1347 static int truncate_mapped_file(const struct osl_table *t, unsigned col_num,
1348                 off_t size)
1349 {
1350         char *filename = column_filename(t, col_num);
1351         int ret = para_truncate(filename, size);
1352         free(filename);
1353         return ret;
1354 }
1355
1356 static int delete_disk_storage_file(const struct osl_table *t, unsigned col_num,
1357                 const char *ds_name)
1358 {
1359         char *dirname, *filename = disk_storage_path(t, col_num, ds_name);
1360         int ret = unlink(filename);
1361
1362         PARA_DEBUG_LOG("deleted %s\n", filename);
1363         free(filename);
1364         if (ret < 0) {
1365                 if (errno == ENOENT)
1366                         return -E_NOENT;
1367                 return -E_UNLINK;
1368         }
1369         if (!(t->desc->flags & OSL_LARGE_TABLE))
1370                 return 1;
1371         dirname = disk_storage_dirname(t, col_num, ds_name);
1372         rmdir(dirname);
1373         free(dirname);
1374         return 1;
1375 }
1376
1377 /**
1378  * Add a new row to an osl table and retrieve this row.
1379  *
1380  * \param t Pointer to an open osl table.
1381  * \param objects Array of objects to be added.
1382  * \param row Result pointer.
1383  *
1384  * The \a objects parameter must point to an array containing one object per
1385  * column.  The order of the objects in the array is given by the table
1386  * description of \a table. Several sanity checks are performed during object
1387  * insertion and the function returns without modifying the table if any of
1388  * these tests fail.  In fact, it is atomic in the sense that it either
1389  * succeeds or leaves the table unchanged (i.e. either all or none of the
1390  * objects are added to the table).
1391  *
1392  * It is considered an error if an object is added to a column with associated
1393  * rbtree if this object is equal to an object already contained in that column
1394  * (i.e. the compare function for the column's rbtree returns zero).
1395  *
1396  * Possible errors include: \p E_RB_KEY_EXISTS, \p E_BAD_DATA_SIZE.
1397  *
1398  * \return Positive on success, negative on errors.
1399  *
1400  * \sa struct osl_table_description, osl_compare_func, osl_add_row().
1401  */
1402 int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects,
1403                 struct osl_row **row)
1404 {
1405         int i, ret;
1406         char *ds_name = NULL;
1407         struct rb_node **rb_parents = NULL, ***rb_links = NULL;
1408         char *new_row_index = NULL;
1409         struct osl_object *volatile_objs = NULL;
1410         const struct osl_column_description *cd;
1411
1412         if (!t)
1413                 return -E_BAD_TABLE;
1414         rb_parents = para_malloc(t->num_rbtrees * sizeof(struct rn_node*));
1415         rb_links = para_malloc(t->num_rbtrees * sizeof(struct rn_node**));
1416         if (t->num_mapped_columns)
1417                 new_row_index = para_malloc(t->row_index_size);
1418         /* pass 1: sanity checks */
1419 //      PARA_DEBUG_LOG("sanity tests: %p:%p\n", objects[0].data,
1420 //              objects[1].data);
1421         FOR_EACH_COLUMN(i, t->desc, cd) {
1422                 enum osl_storage_type st = cd->storage_type;
1423                 enum osl_storage_flags sf = cd->storage_flags;
1424
1425 //              ret = -E_NULL_OBJECT;
1426 //              if (!objects[i])
1427 //                      goto out;
1428                 if (st == OSL_DISK_STORAGE)
1429                         continue;
1430                 if (sf & OSL_RBTREE) {
1431                         unsigned rbtree_num = t->columns[i].rbtree_num;
1432                         ret = -E_RB_KEY_EXISTS;
1433 //                      PARA_DEBUG_LOG("checking whether %p exists\n",
1434 //                              objects[i].data);
1435                         if (search_rbtree(objects + i, t, i,
1436                                         &rb_parents[rbtree_num],
1437                                         &rb_links[rbtree_num]) > 0)
1438                                 goto out;
1439                 }
1440                 if (sf & OSL_FIXED_SIZE) {
1441 //                      PARA_DEBUG_LOG("fixed size. need: %zu, have: %d\n",
1442 //                              objects[i].size, cd->data_size);
1443                         ret = -E_BAD_DATA_SIZE;
1444                         if (objects[i].size != cd->data_size)
1445                                 goto out;
1446                 }
1447         }
1448         if (t->num_disk_storage_columns)
1449                 ds_name = disk_storage_name_of_object(t,
1450                         &objects[t->disk_storage_name_column]);
1451         ret = unmap_table(t, OSL_MARK_CLEAN);
1452         if (ret < 0)
1453                 goto out;
1454 //      PARA_DEBUG_LOG("sanity tests passed%s\n", "");
1455         /* pass 2: create data files, append map data */
1456         FOR_EACH_COLUMN(i, t->desc, cd) {
1457                 enum osl_storage_type st = cd->storage_type;
1458                 if (st == OSL_NO_STORAGE)
1459                         continue;
1460                 if (st == OSL_MAPPED_STORAGE) {
1461                         uint32_t new_size;
1462                         struct osl_column *col = &t->columns[i];
1463 //                      PARA_DEBUG_LOG("appending object of size %zu\n",
1464 //                              objects[i].size);
1465                         ret = append_map_file(t, i, objects + i, &new_size);
1466                         if (ret < 0)
1467                                 goto rollback;
1468                         update_cell_index(new_row_index, col, new_size,
1469                                 objects[i].size);
1470                         continue;
1471                 }
1472                 /* DISK_STORAGE */
1473                 ret = write_disk_storage_file(t, i, objects + i, ds_name);
1474                 if (ret < 0)
1475                         goto rollback;
1476         }
1477         ret = append_row_index(t, new_row_index);
1478         if (ret < 0)
1479                 goto rollback;
1480         ret = map_table(t, MAP_TBL_FL_VERIFY_INDEX);
1481         if (ret < 0) { /* truncate index and rollback changes */
1482                 char *filename = index_filename(t->desc);
1483                 para_truncate(filename, t->row_index_size);
1484                 free(filename);
1485                 goto rollback;
1486         }
1487         /* pass 3: add entry to rbtrees */
1488         if (t->num_volatile_columns) {
1489                 volatile_objs = para_calloc(t->num_volatile_columns
1490                         * sizeof(struct osl_object));
1491                 FOR_EACH_VOLATILE_COLUMN(i, t, cd)
1492                         volatile_objs[t->columns[i].volatile_num] = objects[i];
1493         }
1494         t->num_rows++;
1495 //      PARA_DEBUG_LOG("adding new entry as row #%d\n", t->num_rows - 1);
1496         ret = add_row_to_rbtrees(t, t->num_rows - 1, volatile_objs, row);
1497         if (ret < 0)
1498                 goto out;
1499 //      PARA_DEBUG_LOG("added new entry as row #%d\n", t->num_rows - 1);
1500         ret = 1;
1501         goto out;
1502 rollback: /* rollback all changes made, ignore further errors */
1503         for (i--; i >= 0; i--) {
1504                 cd = get_column_description(t->desc, i);
1505                 enum osl_storage_type st = cd->storage_type;
1506                 if (st == OSL_NO_STORAGE)
1507                         continue;
1508
1509                 if (st == OSL_MAPPED_STORAGE)
1510                         truncate_mapped_file(t, i, objects[i].size);
1511                 else /* disk storage */
1512                         delete_disk_storage_file(t, i, ds_name);
1513         }
1514         /* ignore error and return previous error value */
1515         map_table(t, MAP_TBL_FL_VERIFY_INDEX);
1516 out:
1517         free(new_row_index);
1518         free(ds_name);
1519         free(rb_parents);
1520         free(rb_links);
1521         return ret;
1522 }
1523
1524 /**
1525  * Add a new row to an osl table.
1526  *
1527  * \param t Same meaning as osl_add_and_get_row().
1528  * \param objects Same meaning as osl_add_and_get_row().
1529  *
1530  * \return The return value of the underlying call to osl_add_and_get_row().
1531  *
1532  * This is equivalent to osl_add_and_get_row(t, objects, NULL).
1533  */
1534 int osl_add_row(struct osl_table *t, struct osl_object *objects)
1535 {
1536         return osl_add_and_get_row(t, objects, NULL);
1537 }
1538
1539 /**
1540  * Retrieve an object identified by row and column
1541  *
1542  * \param t Pointer to an open osl table.
1543  * \param r Pointer to the row.
1544  * \param col_num The column number.
1545  * \param object The result pointer.
1546  *
1547  * The column determined by \a col_num must be of type \p OSL_MAPPED_STORAGE
1548  * or \p OSL_NO_STORAGE, i.e. no disk storage objects may be retrieved by this
1549  * function.
1550  *
1551  * \return Positive if object was found, negative on errors. Possible errors
1552  * include: \p E_BAD_TABLE, \p E_BAD_STORAGE_TYPE.
1553  *
1554  * \sa osl_storage_type, osl_open_disk_object().
1555  */
1556 int osl_get_object(const struct osl_table *t, const struct osl_row *r,
1557         unsigned col_num, struct osl_object *object)
1558 {
1559         const struct osl_column_description *cd;
1560
1561         if (!t)
1562                 return -E_BAD_TABLE;
1563         cd = get_column_description(t->desc, col_num);
1564         /* col must not be disk storage */
1565         if (cd->storage_type == OSL_DISK_STORAGE)
1566                 return -E_BAD_STORAGE_TYPE;
1567         if (cd->storage_type == OSL_MAPPED_STORAGE)
1568                 return get_mapped_object(t, col_num, r->num, object);
1569         /* volatile */
1570         *object = r->volatile_objects[t->columns[col_num].volatile_num];
1571         return 1;
1572 }
1573
1574 static int mark_mapped_object_invalid(const struct osl_table *t,
1575                 uint32_t row_num, unsigned col_num)
1576 {
1577         struct osl_object obj;
1578         char *p;
1579         int ret = get_mapped_object(t, col_num, row_num, &obj);
1580
1581         if (ret < 0)
1582                 return ret;
1583         p = obj.data;
1584         p--;
1585         *p = 0xff;
1586         return 1;
1587 }
1588
1589 /**
1590  * Delete a row from an osl table.
1591  *
1592  * \param t Pointer to an open osl table.
1593  * \param row Pointer to the row to delete.
1594  *
1595  * This removes all disk storage objects, removes all rbtree nodes,  and frees
1596  * all volatile objects belonging to the given row. For mapped columns, the
1597  * data is merely marked invalid and may be pruned from time to time by
1598  * para_fsck.
1599  *
1600  * \return Positive on success, negative on errors. Possible errors include:
1601  * \p E_BAD_TABLE, errors returned by osl_get_object().
1602  */
1603 int osl_del_row(struct osl_table *t, struct osl_row *row)
1604 {
1605         struct osl_row *r = row;
1606         int i, ret;
1607         const struct osl_column_description *cd;
1608
1609         if (!t)
1610                 return -E_BAD_TABLE;
1611         PARA_INFO_LOG("deleting row %p\n", row);
1612
1613         if (t->num_disk_storage_columns) {
1614                 char *ds_name;
1615                 ret = disk_storage_name_of_row(t, r, &ds_name);
1616                 if (ret < 0)
1617                         goto out;
1618                 FOR_EACH_DISK_STORAGE_COLUMN(i, t, cd)
1619                         delete_disk_storage_file(t, i, ds_name);
1620                 free(ds_name);
1621         }
1622         FOR_EACH_COLUMN(i, t->desc, cd) {
1623                 struct osl_column *col = t->columns + i;
1624                 enum osl_storage_type st = cd->storage_type;
1625                 remove_rb_node(t, i, r);
1626                 if (st == OSL_MAPPED_STORAGE) {
1627                         mark_mapped_object_invalid(t, r->num, i);
1628                         continue;
1629                 }
1630                 if (st == OSL_NO_STORAGE)
1631                         free(r->volatile_objects[col->volatile_num].data);
1632         }
1633         if (t->num_mapped_columns) {
1634                 ret = mark_row_invalid(t, r->num);
1635                 if (ret < 0)
1636                         goto out;
1637                 t->num_invalid_rows++;
1638         } else
1639                 t->num_rows--;
1640         ret = 1;
1641 out:
1642         free(r->volatile_objects);
1643         free(r);
1644         return ret;
1645 }
1646
1647 /* test if column has an rbtree */
1648 static int check_rbtree_col(const struct osl_table *t, unsigned col_num,
1649                 struct osl_column **col)
1650 {
1651         if (!t)
1652                 return -E_BAD_TABLE;
1653         if (!(get_column_description(t->desc, col_num)->storage_flags & OSL_RBTREE))
1654                 return -E_BAD_STORAGE_FLAGS;
1655         *col = t->columns + col_num;
1656         return 1;
1657 }
1658
1659 /**
1660  * Get the row that contains the given object.
1661  *
1662  * \param t Pointer to an open osl table.
1663  * \param col_num The number of the column to be searched.
1664  * \param obj The object to be looked up.
1665  * \param result Points to the row containing \a obj.
1666  *
1667  * Lookup \a obj in \a t and return the row containing \a obj. The column
1668  * specified by \a col_num must have an associated rbtree.
1669  *
1670  * \return Positive on success, negative on errors. If an error occured, \a
1671  * result is set to \p NULL. Possible errors include: \p E_BAD_TABLE, \p
1672  * E_BAD_STORAGE_FLAGS, errors returned by get_mapped_object(), \p
1673  * E_RB_KEY_NOT_FOUND.
1674  *
1675  * \sa osl_storage_flags
1676  */
1677 int osl_get_row(const struct osl_table *t, unsigned col_num,
1678                 const struct osl_object *obj, struct osl_row **result)
1679 {
1680         int ret;
1681         struct rb_node *node;
1682         struct osl_row *row;
1683         struct osl_column *col;
1684
1685         *result = NULL;
1686         ret = check_rbtree_col(t, col_num, &col);
1687         if (ret < 0)
1688                 return ret;
1689         ret = search_rbtree(obj, t, col_num, &node, NULL);
1690         if (ret < 0)
1691                 return ret;
1692         row = get_row_pointer(node, t->columns[col_num].rbtree_num);
1693         *result = row;
1694         return 1;
1695 }
1696
1697 static int rbtree_loop(struct osl_column *col,  void *private_data,
1698                 osl_rbtree_loop_func *func)
1699 {
1700         struct rb_node *n, *tmp;
1701
1702         /* this for-loop is safe against removal of an entry */
1703         for (n = rb_first(&col->rbtree), tmp = n? rb_next(n) : NULL;
1704                         n;
1705                         n = tmp, tmp = tmp? rb_next(tmp) : NULL) {
1706                 struct osl_row *r = get_row_pointer(n, col->rbtree_num);
1707                 int ret = func(r, private_data);
1708                 if (ret < 0)
1709                         return ret;
1710         }
1711         return 1;
1712 }
1713
1714 static int rbtree_loop_reverse(struct osl_column *col,  void *private_data,
1715                 osl_rbtree_loop_func *func)
1716 {
1717         struct rb_node *n, *tmp;
1718
1719         /* safe against removal of an entry */
1720         for (n = rb_last(&col->rbtree), tmp = n? rb_prev(n) : NULL;
1721                         n;
1722                         n = tmp, tmp = tmp? rb_prev(tmp) : NULL) {
1723                 struct osl_row *r = get_row_pointer(n, col->rbtree_num);
1724                 int ret = func(r, private_data);
1725                 if (ret < 0)
1726                         return ret;
1727         }
1728         return 1;
1729 }
1730
1731 /**
1732  * Loop over all nodes in an rbtree.
1733  *
1734  * \param t Pointer to an open osl table.
1735  * \param col_num The column to use for iterating over the elements.
1736  * \param private_data Pointer that gets passed to \a func.
1737  * \param func The function to be called for each node in the rbtree.
1738  *
1739  * This function does an in-order walk of the rbtree associated with \a
1740  * col_num. It is an error if the \p OSL_RBTREE flag is not set for this
1741  * column. For each node in the rbtree, the given function \a func is called
1742  * with two pointers as arguments: The first osl_row* argument points to the
1743  * row that contains the object corresponding to the rbtree node currently
1744  * traversed, and the \a private_data pointer is passed verbatim to \a func as the
1745  * second argument. The loop terminates either if \a func returns a negative
1746  * value, or if all nodes of the tree have been visited.
1747  *
1748  *
1749  * \return Positive on success, negative on errors. If the termination of the
1750  * loop was caused by \a func returning a negative value, this value is
1751  * returned.
1752  *
1753  * \sa osl_storage_flags, osl_rbtree_loop_reverse(), osl_compare_func.
1754  */
1755 int osl_rbtree_loop(const struct osl_table *t, unsigned col_num,
1756         void *private_data, osl_rbtree_loop_func *func)
1757 {
1758         struct osl_column *col;
1759
1760         int ret = check_rbtree_col(t, col_num, &col);
1761         if (ret < 0)
1762                 return ret;
1763         return rbtree_loop(col, private_data, func);
1764 }
1765
1766 /**
1767  * Loop over all nodes in an rbtree in reverse order.
1768  *
1769  * \param t Identical meaning as in \p osl_rbtree_loop().
1770  * \param col_num Identical meaning as in \p osl_rbtree_loop().
1771  * \param private_data Identical meaning as in \p osl_rbtree_loop().
1772  * \param func Identical meaning as in \p osl_rbtree_loop().
1773  *
1774  * This function is identical to \p osl_rbtree_loop(), the only difference
1775  * is that the tree is walked in reverse order.
1776  *
1777  * \return The same return value as \p osl_rbtree_loop().
1778  *
1779  * \sa osl_rbtree_loop().
1780  */
1781 int osl_rbtree_loop_reverse(const struct osl_table *t, unsigned col_num,
1782         void *private_data, osl_rbtree_loop_func *func)
1783 {
1784         struct osl_column *col;
1785
1786         int ret = check_rbtree_col(t, col_num, &col);
1787         if (ret < 0)
1788                 return ret;
1789         return rbtree_loop_reverse(col, private_data, func);
1790 }
1791
1792 /* TODO: Rollback changes on errors */
1793 static int rename_disk_storage_objects(struct osl_table *t,
1794                 struct osl_object *old_obj, struct osl_object *new_obj)
1795 {
1796         int i, ret;
1797         const struct osl_column_description *cd;
1798         char *old_ds_name, *new_ds_name;
1799
1800         if (!t->num_disk_storage_columns)
1801                 return 1; /* nothing to do */
1802         if (old_obj->size == new_obj->size && !memcmp(new_obj->data,
1803                         old_obj->data, new_obj->size))
1804                 return 1; /* object did not change */
1805         old_ds_name = disk_storage_name_of_object(t, old_obj);
1806         new_ds_name = disk_storage_name_of_object(t, new_obj);
1807         FOR_EACH_DISK_STORAGE_COLUMN(i, t, cd) {
1808                 char *old_filename, *new_filename;
1809                 ret = create_disk_storage_object_dir(t, i, new_ds_name);
1810                 if (ret < 0)
1811                         goto out;
1812                 old_filename = disk_storage_path(t, i, old_ds_name);
1813                 new_filename = disk_storage_path(t, i, new_ds_name);
1814                 ret = para_rename(old_filename, new_filename);
1815                 free(old_filename);
1816                 free(new_filename);
1817                 if (ret < 0)
1818                         goto out;
1819         }
1820         ret = 1;
1821 out:
1822         free(old_ds_name);
1823         free(new_ds_name);
1824         return ret;
1825
1826 }
1827
1828 /**
1829  * Change an object in an osl table.
1830  *
1831  * \param t Pointer to an open osl table.
1832  * \param r Pointer to the row containing the object to be updated.
1833  * \param col_num Number of the column containing the object to be updated.
1834  * \param obj Pointer to the replacement object.
1835  *
1836  * This function  gets rid of all references to the old object. This includes
1837  * removal of the rbtree node in case there is an rbtree associated with \a
1838  * col_num. It then inserts \a obj into the table and the rbtree if neccessary.
1839  *
1840  * If the \p OSL_RBTREE flag is set for \a col_num, you \b MUST call this
1841  * function in order to change the contents of an object, even for volatile or
1842  * mapped columns of constant size (which may be updated directly if \p
1843  * OSL_RBTREE is not set).  Otherwise the rbtree might become corrupted.
1844  *
1845  * \return Positive on success, negative on errors. Possible errors include: \p
1846  * E_BAD_TABLE, \p E_RB_KEY_EXISTS, \p E_BAD_SIZE, \p E_NOENT, \p E_UNLINK,
1847  * errors returned by para_write_file(), \p E_MKDIR.
1848  */
1849 int osl_update_object(struct osl_table *t, const struct osl_row *r,
1850                 unsigned col_num, struct osl_object *obj)
1851 {
1852         struct osl_column *col;
1853         const struct osl_column_description *cd;
1854         int ret;
1855
1856         if (!t)
1857                 return -E_BAD_TABLE;
1858         col = &t->columns[col_num];
1859         cd = get_column_description(t->desc, col_num);
1860         PARA_DEBUG_LOG("updating column %u of %s\n", col_num, t->desc->name);
1861         if (cd->storage_flags & OSL_RBTREE) {
1862                 if (search_rbtree(obj, t, col_num, NULL, NULL) > 0)
1863                         return -E_RB_KEY_EXISTS;
1864         }
1865         if (cd->storage_flags & OSL_FIXED_SIZE) {
1866                 if (obj->size != cd->data_size)
1867                         return -E_BAD_DATA_SIZE;
1868         }
1869         remove_rb_node(t, col_num, r);
1870         if (cd->storage_type == OSL_NO_STORAGE) { /* TODO: If fixed size, reuse object? */
1871                 free(r->volatile_objects[col->volatile_num].data);
1872                 r->volatile_objects[col->volatile_num] = *obj;
1873         } else if (cd->storage_type == OSL_DISK_STORAGE) {
1874                 char *ds_name;
1875                 ret = disk_storage_name_of_row(t, r, &ds_name);
1876                 if (ret < 0)
1877                         return ret;
1878                 ret = delete_disk_storage_file(t, col_num, ds_name);
1879                 if (ret < 0 && ret != -E_NOENT) {
1880                         free(ds_name);
1881                         return ret;
1882                 }
1883                 ret = write_disk_storage_file(t, col_num, obj, ds_name);
1884                 free(ds_name);
1885                 if (ret < 0)
1886                         return ret;
1887         } else { /* mapped storage */
1888                 struct osl_object old_obj;
1889                 ret = get_mapped_object(t, col_num, r->num, &old_obj);
1890                 if (ret < 0)
1891                         return ret;
1892                 /*
1893                  * If the updated column is the disk storage name column, the
1894                  * disk storage name changes, so we have to rename all disk
1895                  * storage objects accordingly.
1896                  */
1897                 if (col_num == t->disk_storage_name_column) {
1898                         ret = rename_disk_storage_objects(t, &old_obj, obj);
1899                         if (ret < 0)
1900                                 return ret;
1901                 }
1902                 if (cd->storage_flags & OSL_FIXED_SIZE)
1903                         memcpy(old_obj.data, obj->data, cd->data_size);
1904                 else { /* TODO: if the size doesn't change, use old space */
1905                         uint32_t new_data_map_size;
1906                         char *row_index;
1907                         ret = get_row_index(t, r->num, &row_index);
1908                         if (ret < 0)
1909                                 return ret;
1910                         ret = mark_mapped_object_invalid(t, r->num, col_num);
1911                         if (ret < 0)
1912                                 return ret;
1913                         unmap_column(t, col_num);
1914                         ret = append_map_file(t, col_num, obj,
1915                                 &new_data_map_size);
1916                         if (ret < 0)
1917                                 return ret;
1918                         ret = map_column(t, col_num);
1919                         if (ret < 0)
1920                                 return ret;
1921                         update_cell_index(row_index, col, new_data_map_size,
1922                                 obj->size);
1923                 }
1924         }
1925         if (cd->storage_flags & OSL_RBTREE) {
1926                 ret = insert_rbtree(t, col_num, r, obj);
1927                 if (ret < 0)
1928                         return ret;
1929         }
1930         return 1;
1931 }
1932
1933 /**
1934  * Retrieve an object of type \p OSL_DISK_STORAGE by row and column.
1935  *
1936  * \param t Pointer to an open osl table.
1937  * \param r Pointer to the row containing the object.
1938  * \param col_num The column number.
1939  * \param obj Points to the result upon successful return.
1940  *
1941  * For columns of type \p OSL_DISK_STORAGE, this function must be used to
1942  * retrieve one of its containing objects. Afterwards, osl_close_disk_object()
1943  * must be called in order to deallocate the resources.
1944  *
1945  * \return Positive on success, negative on errors. Possible errors include:
1946  * \p E_BAD_TABLE, \p E_BAD_STORAGE_TYPE, errors returned by osl_get_object().
1947  *
1948  * \sa osl_get_object(), osl_storage_type, osl_close_disk_object().
1949  */
1950 int osl_open_disk_object(const struct osl_table *t, const struct osl_row *r,
1951                 unsigned col_num, struct osl_object *obj)
1952 {
1953         const struct osl_column_description *cd;
1954         char *ds_name, *filename;
1955         int ret;
1956
1957         if (!t)
1958                 return -E_BAD_TABLE;
1959         cd = get_column_description(t->desc, col_num);
1960         if (cd->storage_type != OSL_DISK_STORAGE)
1961                 return -E_BAD_STORAGE_TYPE;
1962
1963         ret = disk_storage_name_of_row(t, r, &ds_name);
1964         if (ret < 0)
1965                 return ret;
1966         filename = disk_storage_path(t, col_num, ds_name);
1967         free(ds_name);
1968         PARA_DEBUG_LOG("filename: %s\n", filename);
1969         ret = mmap_full_file(filename, O_RDONLY, obj);
1970         free(filename);
1971         return ret;
1972 }
1973
1974 /**
1975  * Free resources that were allocated during osl_open_disk_object().
1976  *
1977  * \param obj Pointer to the object previously returned by open_disk_object().
1978  *
1979  * \return The return value of the underlying call to para_munmap().
1980  *
1981  * \sa para_munmap().
1982  */
1983 int osl_close_disk_object(struct osl_object *obj)
1984 {
1985         return para_munmap(obj->data, obj->size);
1986 }
1987
1988 /**
1989  * Get the number of rows of the given table.
1990  *
1991  * \param t Pointer to an open osl table.
1992  * \param num_rows Result is returned here.
1993  *
1994  * The number of rows returned via \a num_rows excluding any invalid rows.
1995  *
1996  * \return Positive on success, \p -E_BAD_TABLE if \a t is \p NULL.
1997  */
1998 int osl_get_num_rows(const struct osl_table *t, unsigned *num_rows)
1999 {
2000         if (!t)
2001                 return -E_BAD_TABLE;
2002         assert(t->num_rows >= t->num_invalid_rows);
2003         *num_rows = t->num_rows - t->num_invalid_rows;
2004         return 1;
2005 }
2006
2007 /**
2008  * Get the rank of a row.
2009  *
2010  * \param t An open osl table.
2011  * \param r The row to get the rank of.
2012  * \param col_num The number of an rbtree column.
2013  * \param rank Result pointer.
2014  *
2015  * The rank is, by definition, the position of the row in the linear order
2016  * determined by an inorder tree walk of the rbtree associated with column
2017  * number \a col_num of \a table.
2018  *
2019  * \return Positive on success, negative on errors.
2020  *
2021  * \sa osl_get_nth_row().
2022  */
2023 int osl_get_rank(const struct osl_table *t, struct osl_row *r,
2024                 unsigned col_num, unsigned *rank)
2025 {
2026         struct osl_object obj;
2027         struct osl_column *col;
2028         struct rb_node *node;
2029         int ret = check_rbtree_col(t, col_num, &col);
2030
2031         if (ret < 0)
2032                 return ret;
2033         ret = osl_get_object(t, r, col_num, &obj);
2034         if (ret < 0)
2035                 return ret;
2036         ret = search_rbtree(&obj, t, col_num, &node, NULL);
2037         if (ret < 0)
2038                 return ret;
2039         ret = rb_rank(node, rank);
2040         if (ret < 0)
2041                 return -E_BAD_ROW;
2042         return 1;
2043 }
2044
2045 /**
2046  * Get the row with n-th greatest value.
2047  *
2048  * \param t Pointer to an open osl table.
2049  * \param col_num The column number.
2050  * \param n The rank of the desired row.
2051  * \param result Row is returned here.
2052  *
2053  * Retrieve the n-th order statistic with respect to the compare function
2054  * of the rbtree column \a col_num. In other words, get that row with
2055  * \a n th greatest value in column \a col_num. It's an error if
2056  * \a col_num is not a rbtree column, or if \a n is larger than the
2057  * number of rows in the table.
2058  *
2059  * \return Positive on success, negative on errors. Possible errors:
2060  * \p E_BAD_TABLE, \p E_BAD_STORAGE_FLAGS, \p E_RB_KEY_NOT_FOUND.
2061  *
2062  * \sa osl_storage_flags, osl_compare_func, osl_get_row(),
2063  * osl_rbtree_last_row(), osl_rbtree_first_row(), osl_get_rank().
2064  */
2065 int osl_get_nth_row(const struct osl_table *t, unsigned col_num,
2066                 unsigned n, struct osl_row **result)
2067 {
2068         struct osl_column *col;
2069         struct rb_node *node;
2070         int ret = check_rbtree_col(t, col_num, &col);
2071
2072         if (ret < 0)
2073                 return ret;
2074         node = rb_nth(col->rbtree.rb_node, n);
2075         if (!node)
2076                 return -E_RB_KEY_NOT_FOUND;
2077         *result = get_row_pointer(node, col->rbtree_num);
2078         return 1;
2079 }
2080
2081 /**
2082  * Get the row corresponding to the smallest rbtree node of a column.
2083  *
2084  * \param t An open rbtree table.
2085  * \param col_num The number of the rbtree column.
2086  * \param result A pointer to the first row is returned here.
2087  *
2088  * The rbtree node of the smallest object (with respect to the corresponding
2089  * compare function) is selected and the row containing this object is
2090  * returned. It is an error if \a col_num refers to a column without an
2091  * associated rbtree.
2092  *
2093  * \return Positive on success, negative on errors.
2094  *
2095  * \sa osl_get_nth_row(), osl_rbtree_last_row().
2096  */
2097 int osl_rbtree_first_row(const struct osl_table *t, unsigned col_num,
2098                 struct osl_row **result)
2099 {
2100         return osl_get_nth_row(t, col_num, 1, result);
2101 }
2102
2103 /**
2104  * Get the row corresponding to the greatest rbtree node of a column.
2105  *
2106  * \param t The same meaning as in \p osl_rbtree_first_row().
2107  * \param col_num The same meaning as in \p osl_rbtree_first_row().
2108  * \param result The same meaning as in \p osl_rbtree_first_row().
2109  *
2110  * This function works just like osl_rbtree_first_row(), the only difference
2111  * is that the row containing the greatest rather than the smallest object is
2112  * returned.
2113  *
2114  * \return Positive on success, negative on errors.
2115  *
2116  * \sa osl_get_nth_row(), osl_rbtree_first_row().
2117  */
2118 int osl_rbtree_last_row(const struct osl_table *t, unsigned col_num,
2119                 struct osl_row **result)
2120 {
2121         unsigned num_rows;
2122         int ret = osl_get_num_rows(t, &num_rows);
2123
2124         if (ret < 0)
2125                 return ret;
2126         return osl_get_nth_row(t, col_num, num_rows, result);
2127 }