Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions ft/ft-ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,19 @@ void toku_ft_layer_destroy(void);
void toku_ft_serialize_layer_init(void);
void toku_ft_serialize_layer_destroy(void);

void toku_maybe_truncate_file (int fd, uint64_t size_used, uint64_t expected_size, uint64_t *new_size);
void toku_maybe_truncate_file(int fd,
uint64_t size_used,
uint64_t expected_size,
uint64_t *new_size);
// Effect: truncate file if overallocated by at least 32MiB

void toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size, int64_t *new_size);
// Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size
void toku_maybe_preallocate_in_file(int fd,
int64_t size,
int64_t expected_size,
int64_t *new_size,
const char *dbg_context);
// Effect: make the file bigger by either doubling it or growing by 16MiB
// whichever is less, until it is at least size
// Return 0 on success, otherwise an error number.

int toku_ft_get_fragmentation(FT_HANDLE ft_h, TOKU_DB_FRAGMENTATION report) __attribute__ ((warn_unused_result));
Expand Down
16 changes: 11 additions & 5 deletions ft/ft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -866,16 +866,22 @@ toku_ft_update_descriptor(FT ft, DESCRIPTOR desc)
// from the ft's cachefile. we do this so serialize code can
// update a descriptor before the ft is fully opened and has
// a valid cachefile.
void
toku_ft_update_descriptor_with_fd(FT ft, DESCRIPTOR desc, int fd) {
void toku_ft_update_descriptor_with_fd(FT ft, DESCRIPTOR desc, int fd) {
// the checksum is four bytes, so that's where the magic number comes from
// make space for the new descriptor and write it out to disk
DISKOFF offset, size;
size = toku_serialize_descriptor_size(desc) + 4;
ft->blocktable.realloc_descriptor_on_disk(size, &offset, ft, fd);
toku_serialize_descriptor_contents_to_fd(fd, desc, offset);

// cleanup the old descriptor and set the in-memory descriptor to the new one
const char *dbg_context = construct_dbg_context_for_write_others(
toku_cachefile_fname_in_env(ft->cf),
__func__,
"updating the descriptor for an ft");
ft->blocktable.realloc_descriptor_on_disk(
size, &offset, ft, fd, dbg_context);
toku_serialize_descriptor_contents_to_fd(fd, desc, offset, dbg_context);
destruct_dbg_context_for_write(dbg_context);
// cleanup the old descriptor and set the in-memory descriptor to the new
// one
toku_destroy_dbt(&ft->descriptor.dbt);
toku_clone_dbt(&ft->descriptor.dbt, desc->dbt);
}
Expand Down
73 changes: 54 additions & 19 deletions ft/logger/logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,22 +449,44 @@ swap_inbuf_outbuf (TOKULOGGER logger)
assert(logger->inbuf.n_in_buf == 0);
}

static void
write_outbuf_to_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
// Effect: Write the contents of outbuf to logfile. Don't necessarily fsync (but it might, in which case fynced_lsn is updated).
// If the logfile gets too big, open the next one (that's the case where an fsync might happen).
// Entry and exit: Holds permission to modify output (and doesn't let it go, so it's ok to also hold the inlock).
static void write_outbuf_to_logfile(TOKULOGGER logger, LSN *fsynced_lsn)
// Effect: Write the contents of outbuf to logfile. Don't necessarily fsync
// (but it might, in which case fynced_lsn is updated).
// If the logfile gets too big, open the next one (that's the case where an
// fsync might happen).
// Entry and exit: Holds permission to modify output (and doesn't let it go, so
// it's ok to also hold the inlock).
{
if (logger->outbuf.n_in_buf>0) {
if (logger->outbuf.n_in_buf > 0) {
// Write the outbuf to disk, take accounting measurements

int fnamelen = strlen(logger->directory) + 50;
char fname[fnamelen];
snprintf(fname,
fnamelen,
"%s/log%012lld.tokulog%d",
logger->directory,
logger->next_log_file_number,
TOKU_LOG_VERSION);

const char *dbg_context = construct_dbg_context_for_logging(
fname, __func__, "flushing redo logs to the log file");
tokutime_t io_t0 = toku_time_now();
toku_os_full_write(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
toku_os_full_write(logger->fd,
logger->outbuf.buf,
logger->outbuf.n_in_buf,
dbg_context);
tokutime_t io_t1 = toku_time_now();
destruct_dbg_context(dbg_context);

logger->num_writes_to_disk++;
logger->bytes_written_to_disk += logger->outbuf.n_in_buf;
logger->time_spent_writing_to_disk += (io_t1 - io_t0);

assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
assert(logger->outbuf.max_lsn_in_buf.lsn >
logger->written_lsn.lsn); // since there is something in the
// buffer, its LSN must be bigger than
// what's previously written.
logger->written_lsn = logger->outbuf.max_lsn_in_buf;
logger->n_in_file += logger->outbuf.n_in_buf;
logger->outbuf.n_in_buf = 0;
Expand Down Expand Up @@ -672,30 +694,43 @@ void toku_logger_free_logfiles(char **logfiles, int n_logfiles) {
toku_free(logfiles);
}

static int open_logfile (TOKULOGGER logger)
static int open_logfile(TOKULOGGER logger)
// Entry and Exit: This thread has permission to modify the output.
{
int fnamelen = strlen(logger->directory)+50;
int fnamelen = strlen(logger->directory) + 50;
char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012lld.tokulog%d", logger->directory, logger->next_log_file_number, TOKU_LOG_VERSION);
snprintf(fname,
fnamelen,
"%s/log%012lld.tokulog%d",
logger->directory,
logger->next_log_file_number,
TOKU_LOG_VERSION);
long long index = logger->next_log_file_number;
if (logger->write_log_files) {
logger->fd = open(fname, O_CREAT+O_WRONLY+O_TRUNC+O_EXCL+O_BINARY, S_IRUSR+S_IWUSR);
if (logger->fd==-1) {
logger->fd = open(fname,
O_CREAT + O_WRONLY + O_TRUNC + O_EXCL + O_BINARY,
S_IRUSR + S_IWUSR);
if (logger->fd == -1) {
return get_error_errno();
}
fsync_logdir(logger);
logger->next_log_file_number++;
} else {
logger->fd = open(DEV_NULL_FILE, O_WRONLY+O_BINARY);
if (logger->fd==-1) {
logger->fd = open(DEV_NULL_FILE, O_WRONLY + O_BINARY);
if (logger->fd == -1) {
return get_error_errno();
}
}
toku_os_full_write(logger->fd, "tokulogg", 8);
int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order
toku_os_full_write(logger->fd, &version_l, 4);
if ( logger->write_log_files ) {
const char *dbg_context = construct_dbg_context_for_logging(
fname, __func__, "initing the logfile upon opening");
toku_os_full_write(logger->fd, "tokulogg", 8, dbg_context);
int version_l = toku_htonl(log_format_version); // version MUST be in
// network byte order
// regardless of disk order
toku_os_full_write(logger->fd, &version_l, 4, dbg_context);
destruct_dbg_context(dbg_context);

if (logger->write_log_files) {
TOKULOGFILEINFO XMALLOC(lf_info);
lf_info->index = index;
lf_info->maxlsn = logger->written_lsn;
Expand Down
18 changes: 18 additions & 0 deletions ft/logger/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,21 @@ static inline void rbuf_ma_FILENUMS(struct rbuf *rb, memarena *ma, FILENUMS *fil
rbuf_ma_FILENUM(rb, ma, &(filenums->filenums[i]));
}
}

inline const char *construct_dbg_context_for_logging(char *fname,
const char *func_name,
const char *others) {
char *dbg_context =
(char *)toku_malloc(sizeof("Called by %s: writing to file %s for %s") +
sizeof(__func__) + PATH_MAX + sizeof(others));
sprintf(dbg_context,
"Called by %s: writing to file %s for %s",
func_name,
fname,
others);
return (const char *)dbg_context;
}

inline void destruct_dbg_context(const char *buffer) {
toku_free((void *)buffer);
}
21 changes: 14 additions & 7 deletions ft/serialize/block_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ void block_table::_realloc_on_disk_internal(BLOCKNUM b,

void block_table::_ensure_safe_write_unlocked(int fd,
DISKOFF block_size,
DISKOFF block_offset) {
DISKOFF block_offset,
const char *dbg_context) {
// Requires: holding _mutex
uint64_t size_needed = block_size + block_offset;
if (size_needed > _safe_file_size) {
Expand All @@ -489,7 +490,7 @@ void block_table::_ensure_safe_write_unlocked(int fd,

int64_t size_after;
toku_maybe_preallocate_in_file(
fd, size_needed, _safe_file_size, &size_after);
fd, size_needed, _safe_file_size, &size_after, dbg_context);

_mutex_lock();
_safe_file_size = size_after;
Expand All @@ -509,7 +510,11 @@ void block_table::realloc_on_disk(BLOCKNUM b,
_verify_valid_freeable_blocknum(t, b);
_realloc_on_disk_internal(b, size, offset, ft, for_checkpoint);

_ensure_safe_write_unlocked(fd, size, *offset);
char *fname = toku_cachefile_fname_in_env(ft->cf);
const char *dbg_context =
construct_dbg_context_for_write_node(fname, for_checkpoint, __func__);
_ensure_safe_write_unlocked(fd, size, *offset, dbg_context);
destruct_dbg_context_for_write(dbg_context);
_mutex_unlock();
}

Expand Down Expand Up @@ -553,7 +558,8 @@ void block_table::_alloc_inprogress_translation_on_disk_unlocked() {
void block_table::serialize_translation_to_wbuf(int fd,
struct wbuf *w,
int64_t *address,
int64_t *size) {
int64_t *size,
const char *dbg_context) {
_mutex_lock();
struct translation *t = &_inprogress;

Expand Down Expand Up @@ -599,7 +605,7 @@ void block_table::serialize_translation_to_wbuf(int fd,
*size = size_translation;
invariant((*address) % 512 == 0);

_ensure_safe_write_unlocked(fd, size_aligned, *address);
_ensure_safe_write_unlocked(fd, size_aligned, *address, dbg_context);
_mutex_unlock();
}

Expand Down Expand Up @@ -1018,10 +1024,11 @@ void block_table::_realloc_descriptor_on_disk_unlocked(DISKOFF size,
void block_table::realloc_descriptor_on_disk(DISKOFF size,
DISKOFF *offset,
FT ft,
int fd) {
int fd,
const char *dbg_context) {
_mutex_lock();
_realloc_descriptor_on_disk_unlocked(size, offset, ft);
_ensure_safe_write_unlocked(fd, size, *offset);
_ensure_safe_write_unlocked(fd, size, *offset, dbg_context);
_mutex_unlock();
}

Expand Down
47 changes: 44 additions & 3 deletions ft/serialize/block_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ class block_table {
void realloc_descriptor_on_disk(DISKOFF size,
DISKOFF *offset,
struct ft *ft,
int fd);
int fd,
const char *dbg_context);
void get_descriptor_offset_size(DISKOFF *offset, DISKOFF *size);

// External verfication
Expand All @@ -148,7 +149,8 @@ class block_table {
void serialize_translation_to_wbuf(int fd,
struct wbuf *w,
int64_t *address,
int64_t *size);
int64_t *size,
const char *dbg_context);

// DEBUG ONLY (ftdump included), tests included
void blocknum_dump_translation(BLOCKNUM b);
Expand Down Expand Up @@ -259,7 +261,8 @@ class block_table {
void _maybe_truncate_file(int fd, uint64_t size_needed_before);
void _ensure_safe_write_unlocked(int fd,
DISKOFF block_size,
DISKOFF block_offset);
DISKOFF block_offset,
const char *dbg_context);

// Verification
bool _is_valid_blocknum(struct translation *t, BLOCKNUM b);
Expand Down Expand Up @@ -338,3 +341,41 @@ static inline void rbuf_ma_BLOCKNUM(struct rbuf *rb,
BLOCKNUM *blocknum) {
*blocknum = rbuf_blocknum(rb);
}

inline const char *construct_dbg_context_for_write_node(const char *fname,
bool for_checkpoint,
const char *func_name) {
char *dbg_context = (char *)toku_malloc(
sizeof("Called by %s: writing to file %s for checkpoint") +
sizeof(func_name) + PATH_MAX);

if (for_checkpoint)
sprintf(dbg_context,
"Called by %s: writing to file %s for checkpoint",
func_name,
fname);
else
sprintf(dbg_context,
"Called by %s; writing to file %s for eviction",
func_name,
fname);

return (const char *)dbg_context;
}

inline const char *construct_dbg_context_for_write_others(const char *fname,
const char *func_name,
const char *others) {
char *dbg_context =
(char *)toku_malloc(sizeof("Called by %s: writing to file %s for %s") +
sizeof(func_name) + PATH_MAX + sizeof(others));
sprintf(dbg_context,
"Called by %s: writing to file %s for %s",
func_name,
fname,
others);
return (const char *)dbg_context;
}
inline void destruct_dbg_context_for_write(const char *buffer) {
toku_free((void *)buffer);
}
Loading