X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=osl.c;h=2519228890bc2c25f1b82da6887585afe42840ae;hp=0ac7ead0ffda5ed0925106de589b0bba5c41e706;hb=cd8bb5a2d51179a31842a2cd7012cad28deea78d;hpb=19711eb920137434104f561086d6ba79b6af5eae diff --git a/osl.c b/osl.c index 0ac7ead0..25192288 100644 --- a/osl.c +++ b/osl.c @@ -1,24 +1,23 @@ /* - * Copyright (C) 2007 Andre Noll + * Copyright (C) 2007-2008 Andre Noll * * Licensed under the GPL v2. For licencing details see COPYING. */ /** \file osl.c Object storage layer functions. */ -#include "para.h" -#include "error.h" -#include "list.h" -#include "osl_core.h" #include /* readdir() */ #include -//#define FMT_OFF_T "%li" - +#include "para.h" +#include "error.h" +#include "fd.h" +#include "list.h" +#include "osl_core.h" /** * A wrapper for lseek(2). * - * \param fd The filedescriptor whose offset is to be to repositioned. + * \param fd The file descriptor whose offset is to be to repositioned. * \param offset A value-result parameter. * \param whence Usual repositioning directive. * @@ -41,14 +40,14 @@ int para_lseek(int fd, off_t *offset, int whence) } /** - * Waraper for the write system call. + * Wrapper for the write system call. * * \param fd The file descriptor to write to. * \param buf The buffer to write. * \param size The length of \a buf in bytes. * - * This function writes out the given bufffer and retries if an interrupt - * occured during the write. + * This function writes out the given buffer and retries if an interrupt + * occurred during the write. * * \return On success, the number of bytes written is returned, otherwise, the * function returns \p -E_WRITE. @@ -84,11 +83,11 @@ ssize_t para_write(int fd, const void *buf, size_t size) */ ssize_t para_write_all(int fd, const void *buf, size_t size) { - PARA_DEBUG_LOG("writing %zu bytes\n", size); +// PARA_DEBUG_LOG("writing %zu bytes\n", size); const char *b = buf; while (size) { ssize_t ret = para_write(fd, b, size); - PARA_DEBUG_LOG("ret: %zd\n", ret); +// PARA_DEBUG_LOG("ret: %zd\n", ret); if (ret < 0) return ret; b += ret; @@ -96,46 +95,6 @@ ssize_t para_write_all(int fd, const void *buf, size_t size) } return 1; } -/** - * Wrapper for the open(2) system call. - * - * \param path The filename. - * \param flags The usual open(2) flags. - * \param mode Specifies the permissions to use. - * - * The mode parameter must be specified when O_CREAT is in the flags, and is ignored - * otherwise. - * - * \return Positive on success, negative on errors. Possible errors: \p - * E_EXIST, \p E_ISDIR, \p E_NOENT, \p E_OSL_PERM. - * - * \sa open(2). - */ -int para_open(const char *path, int flags, mode_t mode) -{ - PARA_DEBUG_LOG("opening %s\n", path); - int ret = open(path, flags, mode); - - if (ret >= 0) - return ret; - switch (errno) { - case EEXIST: - ret = -E_EXIST; - break; - case EISDIR: - ret = -E_ISDIR; - break; - case ENOENT: - ret = -E_NOENT; - break; - case EPERM: - ret = -E_OSL_PERM; - break; - }; - PARA_ERROR_LOG("failed to open %s: %s\n", path, strerror(errno)); - return ret; -} - /** * Open a file, write the given buffer and close the file. * @@ -170,7 +129,7 @@ static int append_file(const char *filename, char *header, size_t header_size, { int ret, fd; - PARA_DEBUG_LOG("appending %zu + %zu bytes\n", header_size, data_size); +// PARA_DEBUG_LOG("appending %zu + %zu bytes\n", header_size, data_size); ret = para_open(filename, O_WRONLY | O_CREAT | O_APPEND, 0644); if (ret < 0) return ret; @@ -197,54 +156,6 @@ out: return ret; } -/** - * Map a file into memory. - * - * \param path Name of the regular file to map. - * \param open_mode Either \p O_RDONLY or \p O_RDWR. - * \param obj On success, the mapping is returned here. - * - * \return Positive on success, negative on errors. Possible errors include: \p - * E_FSTAT, any errors returned by para_open(), \p E_EMPTY, \p E_MMAP. - * - * \sa para_open(), mmap(2). - */ -int mmap_full_file(const char *path, int open_mode, struct osl_object *obj) -{ - int fd, ret, mmap_prot, mmap_flags; - struct stat file_status; - - if (open_mode == O_RDONLY) { - mmap_prot = PROT_READ; - mmap_flags = MAP_PRIVATE; - } else { - mmap_prot = PROT_READ | PROT_WRITE; - mmap_flags = MAP_SHARED; - } - ret = para_open(path, open_mode, 0); - if (ret < 0) - return ret; - fd = ret; - ret = -E_FSTAT; - if (fstat(fd, &file_status) < 0) - goto out; - obj->size = file_status.st_size; - ret = -E_EMPTY; - PARA_DEBUG_LOG("%s: size %zu\n", path, obj->size); - if (!obj->size) - goto out; - obj->data = mmap(NULL, obj->size, mmap_prot, mmap_flags, fd, 0); - if (obj->data == MAP_FAILED) { - obj->data = NULL; - ret = -E_MMAP; - goto out; - } - ret = 1; -out: - close(fd); - return ret; -} - /** * Traverse the given directory recursively. * @@ -252,9 +163,10 @@ out: * \param func The function to call for each entry. * \param private_data Pointer to an arbitrary data structure. * - * For each regular file in \a dirname, the supplied function \a func is + * For each regular file under \a dirname, the supplied function \a func is * called. The full path of the regular file and the \a private_data pointer - * are passed to \a func. + * are passed to \a func. Directories for which the calling process has no + * permissions to change to are silently ignored. * * \return On success, 1 is returned. Otherwise, this function returns a * negative value which indicates the kind of the error. @@ -262,37 +174,22 @@ out: int for_each_file_in_dir(const char *dirname, int (*func)(const char *, const void *), const void *private_data) { - DIR *dir = NULL; + DIR *dir; struct dirent *entry; - /* - * Opening the current directory (".") and calling fchdir() to return - * is usually faster and more reliable than saving cwd in some buffer - * and calling chdir() afterwards (see man 3 getcwd). - */ - int cwd_fd = open(".", O_RDONLY); - struct stat s; - int ret = -1; - -// PARA_DEBUG_LOG("dirname: %s\n", dirname); - if (cwd_fd < 0) - return -E_OSL_GETCWD; - ret = -E_OSL_CHDIR; - if (chdir(dirname) < 0) - goto out; - ret = -E_OSL_OPENDIR; - dir = opendir("."); - if (!dir) - goto out; + int cwd_fd, ret2, ret = para_opendir(dirname, &dir, &cwd_fd); + + if (ret < 0) + return ret == -ERRNO_TO_PARA_ERROR(EACCES)? 1 : ret; /* scan cwd recursively */ while ((entry = readdir(dir))) { mode_t m; char *tmp; + struct stat s; if (!strcmp(entry->d_name, ".")) continue; if (!strcmp(entry->d_name, "..")) continue; - ret = -E_OSL_LSTAT; if (lstat(entry->d_name, &s) == -1) continue; m = s.st_mode; @@ -314,41 +211,26 @@ int for_each_file_in_dir(const char *dirname, } ret = 1; out: - if (dir) - closedir(dir); - if (fchdir(cwd_fd) < 0 && ret >= 0) - ret = -E_OSL_CHDIR; + closedir(dir); + ret2 = para_fchdir(cwd_fd); + if (ret2 < 0 && ret >= 0) + ret = ret2; close(cwd_fd); return ret; } -int para_mkdir(const char *path, mode_t mode) -{ - if (!mkdir(path, mode)) - return 1; - if (errno == EEXIST) - return -E_EXIST; - if (errno == ENOSPC) - return -E_NOSPC; - if (errno == ENOTDIR) - return -E_NOTDIR; - if (errno == EPERM) - return E_OSL_PERM; - return -E_MKDIR; -} - -static int verify_basename(const char *name) +static int verify_name(const char *name) { if (!name) - return -E_BAD_BASENAME; + return -E_BAD_NAME; if (!*name) - return -E_BAD_BASENAME; + return -E_BAD_NAME; if (strchr(name, '/')) - return -E_BAD_BASENAME; + return -E_BAD_NAME; if (!strcmp(name, "..")) - return -E_BAD_BASENAME; + return -E_BAD_NAME; if (!strcmp(name, ".")) - return -E_BAD_BASENAME; + return -E_BAD_NAME; return 1; } @@ -423,7 +305,7 @@ static int disk_storage_name_of_row(const struct osl_table *t, static void column_name_hash(const char *col_name, HASH_TYPE *hash) { - return hash_function(col_name, strlen(col_name), hash); + hash_function(col_name, strlen(col_name), hash); } static int init_column_descriptions(struct osl_table *t) @@ -432,11 +314,11 @@ static int init_column_descriptions(struct osl_table *t) const struct osl_column_description *cd; ret = -E_BAD_TABLE_DESC; - ret = verify_basename(t->desc->name); + ret = verify_name(t->desc->name); if (ret < 0) goto err; ret = -E_BAD_DB_DIR; - if (!t->desc->dir) + if (!t->desc->dir && (t->num_disk_storage_columns || t->num_mapped_columns)) goto err; /* the size of the index header without column descriptions */ t->index_header_size = IDX_COLUMN_DESCRIPTIONS; @@ -451,7 +333,7 @@ static int init_column_descriptions(struct osl_table *t) ret = -E_NO_COLUMN_NAME; if (!cd->name || !cd->name[0]) goto err; - ret = verify_basename(cd->name); + ret = verify_name(cd->name); if (ret < 0) goto err; t->index_header_size += index_column_description_size(cd->name); @@ -493,10 +375,10 @@ int init_table_structure(const struct osl_table_description *desc, struct osl_table *t = para_calloc(sizeof(*t)); int i, ret = -E_BAD_TABLE_DESC, have_disk_storage_name_column = 0; - PARA_INFO_LOG("creating table structure for '%s' from table " - "description\n", desc->name); if (!desc) goto err; + PARA_DEBUG_LOG("creating table structure for '%s' from table " + "description\n", desc->name); ret = -E_NO_COLUMN_DESC; if (!desc->column_descriptions) goto err; @@ -526,8 +408,8 @@ int init_table_structure(const struct osl_table_description *desc, break; case OSL_MAPPED_STORAGE: t->num_mapped_columns++; - col->index_offset = t->index_entry_size; - t->index_entry_size += 8; + col->index_offset = t->row_index_size; + t->row_index_size += 8; break; case OSL_NO_STORAGE: col->volatile_num = t->num_volatile_columns; @@ -551,7 +433,7 @@ int init_table_structure(const struct osl_table_description *desc, if (!t->num_rbtrees) goto err; /* success */ - PARA_INFO_LOG("OK. Index entry size: %u\n", t->index_entry_size); + PARA_DEBUG_LOG("OK. Index entry size: %u\n", t->row_index_size); ret = init_column_descriptions(t); if (ret < 0) goto err; @@ -655,7 +537,7 @@ static int compare_table_descriptions(struct osl_table *t) ret = -E_BAD_TABLE_FLAGS; if (desc.flags != t->desc->flags) goto out; - ret = E_BAD_COLUMN_NUM; + ret = -E_BAD_COLUMN_NUM; if (desc.num_columns != t->desc->num_columns) goto out; FOR_EACH_COLUMN(i, t->desc, cd1) { @@ -677,7 +559,7 @@ static int compare_table_descriptions(struct osl_table *t) if (strcmp(cd1->name, cd2->name)) goto out; } - PARA_INFO_LOG("table description of '%s' matches on-disk data, good\n", + PARA_DEBUG_LOG("table description of '%s' matches on-disk data, good\n", t->desc->name); ret = 1; out: @@ -729,10 +611,7 @@ static int create_table_index(struct osl_table *t) * * \param desc Pointer to the table description. * - * \return Positive on success, negative on errors. Possible errors include: \p - * E_BAD_TABLE_DESC, \p E_BAD_DB_DIR, \p E_BAD_BASENAME, \p E_NO_COMPARE_FUNC, \p - * E_NO_COLUMN_NAME, \p E_DUPLICATE_COL_NAME, \p E_MKDIR, any errors returned - * by para_open(). + * \return Standard. */ int osl_create_table(const struct osl_table_description *desc) { @@ -749,7 +628,7 @@ int osl_create_table(const struct osl_table_description *desc) continue; if (!table_dir) { ret = para_mkdir(desc->dir, 0777); - if (ret < 0 && ret != -E_EXIST) + if (ret < 0 && !is_errno(-ret, EEXIST)) goto out; table_dir = make_message("%s/%s", desc->dir, desc->name); @@ -806,14 +685,24 @@ static void mark_table_clean(struct osl_table *t) write_u8(buf, read_u8(buf) & 0xfe); } +static void unmap_column(struct osl_table *t, unsigned col_num) +{ + struct osl_object map = t->columns[col_num].data_map; + int ret; + if (!map.data) + return; + ret = para_munmap(map.data, map.size); + assert(ret > 0); + map.data = NULL; +} + /** * Unmap all mapped files of an osl table. * * \param t Pointer to a mapped table. * \param flags Options for unmapping. * - * \return Positive on success, negative on errors. Possible errors include: - * E_NOT_MAPPED, E_MUNMAP. + * \return Positive on success, negative on errors. * * \sa map_table(), enum osl_close_flags, para_munmap(). */ @@ -825,7 +714,7 @@ int unmap_table(struct osl_table *t, enum osl_close_flags flags) if (!t->num_mapped_columns) /* can this ever happen? */ return 1; - PARA_INFO_LOG("unmapping table '%s'\n", t->desc->name); + PARA_DEBUG_LOG("unmapping table '%s'\n", t->desc->name); if (!t->index_map.data) return -E_NOT_MAPPED; if (flags & OSL_MARK_CLEAN) @@ -836,18 +725,32 @@ int unmap_table(struct osl_table *t, enum osl_close_flags flags) t->index_map.data = NULL; if (!t->num_rows) return 1; - FOR_EACH_MAPPED_COLUMN(i, t, cd) { - struct osl_object map = t->columns[i].data_map; - if (!map.data) - continue; - ret = para_munmap(map.data, map.size); - if (ret < 0) - return ret; - map.data = NULL; - } + FOR_EACH_MAPPED_COLUMN(i, t, cd) + unmap_column(t, i); return 1; } +static int map_column(struct osl_table *t, unsigned col_num) +{ + struct stat statbuf; + char *filename = column_filename(t, col_num); + int ret = -E_STAT; + if (stat(filename, &statbuf) < 0) { + free(filename); + return ret; + } + if (!(S_IFREG & statbuf.st_mode)) { + free(filename); + return ret; + } + ret = mmap_full_file(filename, O_RDWR, + &t->columns[col_num].data_map.data, + &t->columns[col_num].data_map.size, + NULL); + free(filename); + return ret; +} + /** * Map the index file and all columns of type \p OSL_MAPPED_STORAGE into memory. * @@ -872,7 +775,7 @@ int map_table(struct osl_table *t, enum map_table_flags flags) filename = index_filename(t->desc); PARA_DEBUG_LOG("mapping table '%s' (index: %s)\n", t->desc->name, filename); ret = mmap_full_file(filename, flags & MAP_TBL_FL_MAP_RDONLY? - O_RDONLY : O_RDWR, &t->index_map); + O_RDONLY : O_RDWR, &t->index_map.data, &t->index_map.size, NULL); free(filename); if (ret < 0) return ret; @@ -894,20 +797,7 @@ int map_table(struct osl_table *t, enum map_table_flags flags) return num_rows; /* map data files */ FOR_EACH_MAPPED_COLUMN(i, t, cd) { - struct stat statbuf; - filename = column_filename(t, i); - ret = -E_STAT; - if (stat(filename, &statbuf) < 0) { - free(filename); - goto err; - } - if (!(S_IFREG & statbuf.st_mode)) { - free(filename); - goto err; - } - ret = mmap_full_file(filename, O_RDWR, - &t->columns[i].data_map); - free(filename); + ret = map_column(t, i); if (ret < 0) goto err; } @@ -935,7 +825,7 @@ err: /* unmap what is already mapped */ * of storage type \p OSL_MAPPED_STORAGE. * * \return Positive on success, negative on errors. Possible errors include: - * \p E_BAD_ID, \p E_INVALID_OBJECT. + * \p E_BAD_ROW_NUM, \p E_INVALID_OBJECT. * * \sa osl_storage_type. */ @@ -945,17 +835,16 @@ int get_mapped_object(const struct osl_table *t, unsigned col_num, struct osl_column *col = &t->columns[col_num]; uint32_t offset; char *header; - char *index_entry; + char *cell_index; int ret; if (t->num_rows <= row_num) - return -E_BAD_ID; - ret = get_index_entry(t, row_num, col_num, &index_entry); + return -E_BAD_ROW_NUM; + ret = get_cell_index(t, row_num, col_num, &cell_index); if (ret < 0) return ret; - offset = read_u32(index_entry); - obj->size = read_u32(index_entry + 4) - 1; - PARA_DEBUG_LOG("index_entry: %p\n", index_entry); + offset = read_u32(cell_index); + obj->size = read_u32(cell_index + 4) - 1; header = col->data_map.data + offset; obj->data = header + 1; if (read_u8(header) == 0xff) { @@ -963,8 +852,6 @@ int get_mapped_object(const struct osl_table *t, unsigned col_num, obj->size, offset); return -E_INVALID_OBJECT; } - PARA_DEBUG_LOG("mapped obj row_num: %u, col %u, size: %zu\n", row_num, - col_num, obj->size); return 1; } @@ -984,7 +871,7 @@ static int search_rbtree(const struct osl_object *obj, struct osl_object this_obj; parent = *new; if (st == OSL_MAPPED_STORAGE) { - ret = get_mapped_object(t, col_num, this_row->id, + ret = get_mapped_object(t, col_num, this_row->num, &this_obj); if (ret < 0) return ret; @@ -1054,7 +941,7 @@ static void remove_rb_node(struct osl_table *t, unsigned col_num, rb_erase(victim, &col->rbtree); } -static int add_row_to_rbtrees(struct osl_table *t, uint32_t id, +static int add_row_to_rbtrees(struct osl_table *t, uint32_t row_num, struct osl_object *volatile_objs, struct osl_row **row_ptr) { unsigned i; @@ -1062,20 +949,18 @@ static int add_row_to_rbtrees(struct osl_table *t, uint32_t id, struct osl_row *row = allocate_row(t->num_rbtrees); const struct osl_column_description *cd; - PARA_DEBUG_LOG("row: %p, id: %u\n", row, id); - row->id = id; + row->num = row_num; row->volatile_objects = volatile_objs; FOR_EACH_RBTREE_COLUMN(i, t, cd) { if (cd->storage_type == OSL_MAPPED_STORAGE) { struct osl_object obj; - ret = get_mapped_object(t, i, id, &obj); + ret = get_mapped_object(t, i, row_num, &obj); if (ret < 0) goto err; ret = insert_rbtree(t, i, row, &obj); } else { /* volatile */ const struct osl_object *obj = volatile_objs + t->columns[i].volatile_num; - PARA_DEBUG_LOG("inserting %p\n", obj->data); ret = insert_rbtree(t, i, row, obj); } if (ret < 0) @@ -1109,8 +994,14 @@ static void free_volatile_objects(const struct osl_table *t, for (n = rb_first(&rb_col->rbtree); n; n = rb_next(n)) { struct osl_row *r = get_row_pointer(n, rb_col->rbtree_num); if (flags & OSL_FREE_VOLATILE) - for (j = 0; j < t->num_volatile_columns; j++) - free(r->volatile_objects[j].data); + FOR_EACH_VOLATILE_COLUMN(j, t, cd) { + if (cd->storage_flags & OSL_DONT_FREE) + continue; + free(r->volatile_objects[ + t->columns[j].volatile_num].data); + } +// for (j = 0; j < t->num_volatile_columns; j++) +// free(r->volatile_objects[j].data); free(r->volatile_objects); } } @@ -1152,7 +1043,7 @@ void clear_rbtrees(struct osl_table *t) * \param flags Options for what should be cleaned up. * * If osl_open_table() succeeds, the resulting table pointer must later be - * passed to this function in order to flush all changes to the filesystem and + * passed to this function in order to flush all changes to the file system and * to free the resources that were allocated by osl_open_table(). * * \return Positive on success, negative on errors. Possible errors: \p E_BAD_TABLE, @@ -1190,13 +1081,13 @@ int osl_close_table(struct osl_table *t, enum osl_close_flags flags) */ int row_is_invalid(struct osl_table *t, uint32_t row_num) { - char *index_entry; - int i, ret = get_index_entry_start(t, row_num, &index_entry); + char *row_index; + int i, ret = get_row_index(t, row_num, &row_index); if (ret < 0) return ret; - for (i = 0; i < t->index_entry_size; i++) { - if ((unsigned char)index_entry[i] != 0xff) + for (i = 0; i < t->row_index_size; i++) { + if ((unsigned char)row_index[i] != 0xff) return 0; } PARA_INFO_LOG("row %d is invalid\n", row_num); @@ -1216,14 +1107,13 @@ int row_is_invalid(struct osl_table *t, uint32_t row_num) */ int mark_row_invalid(struct osl_table *t, uint32_t row_num) { - char *index_entry; - int i, ret = get_index_entry_start(t, row_num, &index_entry); + char *row_index; + int ret = get_row_index(t, row_num, &row_index); - PARA_INFO_LOG("marking row %d as invalid\n", row_num); if (ret < 0) return ret; - for (i = 0; i < t->index_entry_size; i++) - index_entry[i] = 0xff; + PARA_INFO_LOG("marking row %d as invalid\n", row_num); + memset(row_index, 0xff, t->row_index_size); return 1; } @@ -1270,10 +1160,7 @@ int init_rbtrees(struct osl_table *t) * The table description given by \a desc should coincide with the * description used at creation time. * - * \return Positive on success, negative on errors. Possible errors include: - * errors returned by init_table_structure(), \p E_NOENT, \p E_STAT, \p \p - * E_NOTDIR, \p E_BAD_TABLE_DESC, \p E_BAD_DB_DIR, \p E_NO_COMPARE_FUNC, \p - * E_NO_COLUMN_NAME, errors returned by init_rbtrees(). + * \return Standard. */ int osl_open_table(const struct osl_table_description *table_desc, struct osl_table **result) @@ -1293,13 +1180,10 @@ int osl_open_table(const struct osl_table_description *table_desc, ret = stat(dirname, &statbuf); free(dirname); if (ret < 0) { - if (errno == ENOENT) - ret = -E_NOENT; - else - ret = -E_STAT; + ret = -ERRNO_TO_PARA_ERROR(errno); goto err; } - ret = -E_NOTDIR; + ret = -ERRNO_TO_PARA_ERROR(ENOTDIR); if (!S_ISDIR(statbuf.st_mode)) goto err; } @@ -1332,7 +1216,7 @@ static int create_disk_storage_object_dir(const struct osl_table *t, dirname = disk_storage_dirname(t, col_num, ds_name); ret = para_mkdir(dirname, 0777); free(dirname); - if (ret < 0 && ret != -E_EXIST) + if (ret < 0 && !is_errno(-ret, EEXIST)) return ret; return 1; } @@ -1359,14 +1243,14 @@ static int append_map_file(const struct osl_table *t, unsigned col_num, int ret; char header = 0; /* zero means valid object */ - PARA_DEBUG_LOG("appending %zu + 1 byte\n", obj->size); +// PARA_DEBUG_LOG("appending %zu + 1 byte\n", obj->size); ret = append_file(filename, &header, 1, obj->data, obj->size, new_size); free(filename); return ret; } -static int append_index_entry(const struct osl_table *t, char *new_index_entry) +static int append_row_index(const struct osl_table *t, char *row_index) { char *filename; int ret; @@ -1374,9 +1258,8 @@ static int append_index_entry(const struct osl_table *t, char *new_index_entry) if (!t->num_mapped_columns) return 1; filename = index_filename(t->desc); - PARA_DEBUG_LOG("appending %u bytes\n", t->index_entry_size); - ret = append_file(filename, NULL, 0, new_index_entry, - t->index_entry_size, NULL); + ret = append_file(filename, NULL, 0, row_index, + t->row_index_size, NULL); free(filename); return ret; } @@ -1426,15 +1309,11 @@ static int delete_disk_storage_file(const struct osl_table *t, unsigned col_num, const char *ds_name) { char *dirname, *filename = disk_storage_path(t, col_num, ds_name); - int ret = unlink(filename); + int ret = unlink(filename), err = errno; - PARA_INFO_LOG("deleted %s\n", filename); free(filename); - if (ret < 0) { - if (errno == ENOENT) - return -E_NOENT; - return -E_UNLINK; - } + if (ret < 0) + return -ERRNO_TO_PARA_ERROR(err); if (!(t->desc->flags & OSL_LARGE_TABLE)) return 1; dirname = disk_storage_dirname(t, col_num, ds_name); @@ -1474,7 +1353,7 @@ int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects, int i, ret; char *ds_name = NULL; struct rb_node **rb_parents = NULL, ***rb_links = NULL; - char *new_index_entry = NULL; + char *new_row_index = NULL; struct osl_object *volatile_objs = NULL; const struct osl_column_description *cd; @@ -1483,10 +1362,10 @@ int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects, rb_parents = para_malloc(t->num_rbtrees * sizeof(struct rn_node*)); rb_links = para_malloc(t->num_rbtrees * sizeof(struct rn_node**)); if (t->num_mapped_columns) - new_index_entry = para_malloc(t->index_entry_size); + new_row_index = para_malloc(t->row_index_size); /* pass 1: sanity checks */ - PARA_DEBUG_LOG("sanity tests: %p:%p\n", objects[0].data, - objects[1].data); +// PARA_DEBUG_LOG("sanity tests: %p:%p\n", objects[0].data, +// objects[1].data); FOR_EACH_COLUMN(i, t->desc, cd) { enum osl_storage_type st = cd->storage_type; enum osl_storage_flags sf = cd->storage_flags; @@ -1499,16 +1378,16 @@ int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects, if (sf & OSL_RBTREE) { unsigned rbtree_num = t->columns[i].rbtree_num; ret = -E_RB_KEY_EXISTS; - PARA_DEBUG_LOG("checking whether %p exists\n", - objects[i].data); +// PARA_DEBUG_LOG("checking whether %p exists\n", +// objects[i].data); if (search_rbtree(objects + i, t, i, &rb_parents[rbtree_num], &rb_links[rbtree_num]) > 0) goto out; } if (sf & OSL_FIXED_SIZE) { - PARA_DEBUG_LOG("fixed size. need: %zu, have: %d\n", - objects[i].size, cd->data_size); +// PARA_DEBUG_LOG("fixed size. need: %zu, have: %d\n", +// objects[i].size, cd->data_size); ret = -E_BAD_DATA_SIZE; if (objects[i].size != cd->data_size) goto out; @@ -1520,7 +1399,7 @@ int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects, ret = unmap_table(t, OSL_MARK_CLEAN); if (ret < 0) goto out; - PARA_DEBUG_LOG("sanity tests passed%s\n", ""); +// PARA_DEBUG_LOG("sanity tests passed%s\n", ""); /* pass 2: create data files, append map data */ FOR_EACH_COLUMN(i, t->desc, cd) { enum osl_storage_type st = cd->storage_type; @@ -1529,12 +1408,12 @@ int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects, if (st == OSL_MAPPED_STORAGE) { uint32_t new_size; struct osl_column *col = &t->columns[i]; - PARA_DEBUG_LOG("appending object of size %zu\n", - objects[i].size); +// PARA_DEBUG_LOG("appending object of size %zu\n", +// objects[i].size); ret = append_map_file(t, i, objects + i, &new_size); if (ret < 0) goto rollback; - update_index_entry(new_index_entry, col, new_size, + update_cell_index(new_row_index, col, new_size, objects[i].size); continue; } @@ -1543,13 +1422,13 @@ int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects, if (ret < 0) goto rollback; } - ret = append_index_entry(t, new_index_entry); + ret = append_row_index(t, new_row_index); if (ret < 0) goto rollback; ret = map_table(t, MAP_TBL_FL_VERIFY_INDEX); if (ret < 0) { /* truncate index and rollback changes */ char *filename = index_filename(t->desc); - para_truncate(filename, t->index_entry_size); + para_truncate(filename, t->row_index_size); free(filename); goto rollback; } @@ -1561,11 +1440,11 @@ int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects, volatile_objs[t->columns[i].volatile_num] = objects[i]; } t->num_rows++; - PARA_DEBUG_LOG("adding new entry as row #%d\n", t->num_rows - 1); +// PARA_DEBUG_LOG("adding new entry as row #%d\n", t->num_rows - 1); ret = add_row_to_rbtrees(t, t->num_rows - 1, volatile_objs, row); if (ret < 0) goto out; - PARA_DEBUG_LOG("added new entry as row #%d\n", t->num_rows - 1); +// PARA_DEBUG_LOG("added new entry as row #%d\n", t->num_rows - 1); ret = 1; goto out; rollback: /* rollback all changes made, ignore further errors */ @@ -1583,7 +1462,7 @@ rollback: /* rollback all changes made, ignore further errors */ /* ignore error and return previous error value */ map_table(t, MAP_TBL_FL_VERIFY_INDEX); out: - free(new_index_entry); + free(new_row_index); free(ds_name); free(rb_parents); free(rb_links); @@ -1634,18 +1513,18 @@ int osl_get_object(const struct osl_table *t, const struct osl_row *r, if (cd->storage_type == OSL_DISK_STORAGE) return -E_BAD_STORAGE_TYPE; if (cd->storage_type == OSL_MAPPED_STORAGE) - return get_mapped_object(t, col_num, r->id, object); + return get_mapped_object(t, col_num, r->num, object); /* volatile */ *object = r->volatile_objects[t->columns[col_num].volatile_num]; return 1; } -static int mark_mapped_object_invalid(const struct osl_table *t, uint32_t id, - unsigned col_num) +static int mark_mapped_object_invalid(const struct osl_table *t, + uint32_t row_num, unsigned col_num) { struct osl_object obj; char *p; - int ret = get_mapped_object(t, col_num, id, &obj); + int ret = get_mapped_object(t, col_num, row_num, &obj); if (ret < 0) return ret; @@ -1693,14 +1572,14 @@ int osl_del_row(struct osl_table *t, struct osl_row *row) enum osl_storage_type st = cd->storage_type; remove_rb_node(t, i, r); if (st == OSL_MAPPED_STORAGE) { - mark_mapped_object_invalid(t, r->id, i); + mark_mapped_object_invalid(t, r->num, i); continue; } - if (st == OSL_NO_STORAGE) + if (st == OSL_NO_STORAGE && !(cd->storage_flags & OSL_DONT_FREE)) free(r->volatile_objects[col->volatile_num].data); } if (t->num_mapped_columns) { - ret = mark_row_invalid(t, r->id); + ret = mark_row_invalid(t, r->num); if (ret < 0) goto out; t->num_invalid_rows++; @@ -1736,7 +1615,7 @@ static int check_rbtree_col(const struct osl_table *t, unsigned col_num, * Lookup \a obj in \a t and return the row containing \a obj. The column * specified by \a col_num must have an associated rbtree. * - * \return Positive on success, negative on errors. If an error occured, \a + * \return Positive on success, negative on errors. If an error occurred, \a * result is set to \p NULL. Possible errors include: \p E_BAD_TABLE, \p * E_BAD_STORAGE_FLAGS, errors returned by get_mapped_object(), \p * E_RB_KEY_NOT_FOUND. @@ -1766,9 +1645,12 @@ int osl_get_row(const struct osl_table *t, unsigned col_num, static int rbtree_loop(struct osl_column *col, void *private_data, osl_rbtree_loop_func *func) { - struct rb_node *n; + struct rb_node *n, *tmp; - for (n = rb_first(&col->rbtree); n; n = rb_next(n)) { + /* this for-loop is safe against removal of an entry */ + for (n = rb_first(&col->rbtree), tmp = n? rb_next(n) : NULL; + n; + n = tmp, tmp = tmp? rb_next(tmp) : NULL) { struct osl_row *r = get_row_pointer(n, col->rbtree_num); int ret = func(r, private_data); if (ret < 0) @@ -1780,9 +1662,12 @@ static int rbtree_loop(struct osl_column *col, void *private_data, static int rbtree_loop_reverse(struct osl_column *col, void *private_data, osl_rbtree_loop_func *func) { - struct rb_node *n; + struct rb_node *n, *tmp; - for (n = rb_last(&col->rbtree); n; n = rb_prev(n)) { + /* safe against removal of an entry */ + for (n = rb_last(&col->rbtree), tmp = n? rb_prev(n) : NULL; + n; + n = tmp, tmp = tmp? rb_prev(tmp) : NULL) { struct osl_row *r = get_row_pointer(n, col->rbtree_num); int ret = func(r, private_data); if (ret < 0) @@ -1802,9 +1687,9 @@ static int rbtree_loop_reverse(struct osl_column *col, void *private_data, * This function does an in-order walk of the rbtree associated with \a * col_num. It is an error if the \p OSL_RBTREE flag is not set for this * column. For each node in the rbtree, the given function \a func is called - * with two \p void* pointers as arguments: The first argument points to the + * with two pointers as arguments: The first osl_row* argument points to the * row that contains the object corresponding to the rbtree node currently - * traversed, and the \a private_data pointer is passed to \a func as the + * traversed, and the \a private_data pointer is passed verbatim to \a func as the * second argument. The loop terminates either if \a func returns a negative * value, or if all nodes of the tree have been visited. * @@ -1898,16 +1783,14 @@ out: * * This function gets rid of all references to the old object. This includes * removal of the rbtree node in case there is an rbtree associated with \a - * col_num. It then inserts \a obj into the table and the rbtree if neccessary. + * col_num. It then inserts \a obj into the table and the rbtree if necessary. * * If the \p OSL_RBTREE flag is set for \a col_num, you \b MUST call this * function in order to change the contents of an object, even for volatile or * mapped columns of constant size (which may be updated directly if \p * OSL_RBTREE is not set). Otherwise the rbtree might become corrupted. * - * \return Positive on success, negative on errors. Possible errors include: \p - * E_BAD_TABLE, \p E_RB_KEY_EXISTS, \p E_BAD_SIZE, \p E_NOENT, \p E_UNLINK, - * errors returned by para_write_file(), \p E_MKDIR. + * \return Standard */ int osl_update_object(struct osl_table *t, const struct osl_row *r, unsigned col_num, struct osl_object *obj) @@ -1920,13 +1803,14 @@ int osl_update_object(struct osl_table *t, const struct osl_row *r, return -E_BAD_TABLE; col = &t->columns[col_num]; cd = get_column_description(t->desc, col_num); + PARA_DEBUG_LOG("updating column %u of %s\n", col_num, t->desc->name); if (cd->storage_flags & OSL_RBTREE) { if (search_rbtree(obj, t, col_num, NULL, NULL) > 0) return -E_RB_KEY_EXISTS; } if (cd->storage_flags & OSL_FIXED_SIZE) { if (obj->size != cd->data_size) - return -E_BAD_SIZE; + return -E_BAD_DATA_SIZE; } remove_rb_node(t, col_num, r); if (cd->storage_type == OSL_NO_STORAGE) { /* TODO: If fixed size, reuse object? */ @@ -1938,7 +1822,7 @@ int osl_update_object(struct osl_table *t, const struct osl_row *r, if (ret < 0) return ret; ret = delete_disk_storage_file(t, col_num, ds_name); - if (ret < 0 && ret != -E_NOENT) { + if (ret < 0 && !is_errno(-ret, ENOENT)) { free(ds_name); return ret; } @@ -1948,7 +1832,7 @@ int osl_update_object(struct osl_table *t, const struct osl_row *r, return ret; } else { /* mapped storage */ struct osl_object old_obj; - ret = get_mapped_object(t, col_num, r->id, &old_obj); + ret = get_mapped_object(t, col_num, r->num, &old_obj); if (ret < 0) return ret; /* @@ -1965,18 +1849,22 @@ int osl_update_object(struct osl_table *t, const struct osl_row *r, memcpy(old_obj.data, obj->data, cd->data_size); else { /* TODO: if the size doesn't change, use old space */ uint32_t new_data_map_size; - char *index_entry; - ret = get_index_entry_start(t, r->id, &index_entry); + char *row_index; + ret = get_row_index(t, r->num, &row_index); if (ret < 0) return ret; - ret = mark_mapped_object_invalid(t, r->id, col_num); + ret = mark_mapped_object_invalid(t, r->num, col_num); if (ret < 0) return ret; + unmap_column(t, col_num); ret = append_map_file(t, col_num, obj, &new_data_map_size); if (ret < 0) return ret; - update_index_entry(index_entry, col, new_data_map_size, + ret = map_column(t, col_num); + if (ret < 0) + return ret; + update_cell_index(row_index, col, new_data_map_size, obj->size); } } @@ -2024,7 +1912,7 @@ int osl_open_disk_object(const struct osl_table *t, const struct osl_row *r, filename = disk_storage_path(t, col_num, ds_name); free(ds_name); PARA_DEBUG_LOG("filename: %s\n", filename); - ret = mmap_full_file(filename, O_RDONLY, obj); + ret = mmap_full_file(filename, O_RDONLY, &obj->data, &obj->size, NULL); free(filename); return ret; } @@ -2071,7 +1959,7 @@ int osl_get_num_rows(const struct osl_table *t, unsigned *num_rows) * \param rank Result pointer. * * The rank is, by definition, the position of the row in the linear order - * determined by an inorder tree walk of the rbtree associated with column + * determined by an in-order tree walk of the rbtree associated with column * number \a col_num of \a table. * * \return Positive on success, negative on errors. @@ -2125,8 +2013,17 @@ int osl_get_nth_row(const struct osl_table *t, unsigned col_num, { struct osl_column *col; struct rb_node *node; - int ret = check_rbtree_col(t, col_num, &col); + unsigned num_rows; + int ret; + if (n == 0) + return -E_RB_KEY_NOT_FOUND; + ret = osl_get_num_rows(t, &num_rows); + if (ret < 0) + return ret; + if (n > num_rows) + return -E_RB_KEY_NOT_FOUND; + ret = check_rbtree_col(t, col_num, &col); if (ret < 0) return ret; node = rb_nth(col->rbtree.rb_node, n);