Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions portability/toku_pthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#include <pthread.h>
#include <time.h>
#include <stdint.h>
#include <stdlib.h>

#include "toku_assert.h"

Expand Down Expand Up @@ -247,6 +248,31 @@ static inline void
toku_cond_wait(toku_cond_t *cond, toku_mutex_t *mutex) {
#if TOKU_PTHREAD_DEBUG
invariant(mutex->locked);

switch (random()%3) {
case 0:
// Sometimes we return immediately. This is one case of a
// spurious wakeup, which cond_wait is allowed to do. We
// want to exercise the users of this code to make sure
// they handle it.
return;
case 1:
// Sometimes we unlock the mutex and relock it. This is
// another case of a spurious wakeup. It's different from
// case 0, in that it gives someone else a chance to run,
// and then we can spuriously wake up after they have
// possibly released the mutex without setting the
// condition to the one that our caller expects.
toku_mutex_unlock(mutex);
toku_mutex_lock(mutex);
return;
default:
// The default is to simply call pthread_cond_wait, (which
// may still spuriosly wake up, but won't do it reliably
// enough to test the caller's use of the condition.
break;
}

mutex->locked = false;
mutex->owner = 0;
#endif
Expand Down
204 changes: 54 additions & 150 deletions util/frwlock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,56 +52,19 @@ static int get_local_tid() {
}

void frwlock::init(toku_mutex_t *const mutex) {
m_queue = new std::queue<frwlock_queueitem>();
m_mutex = mutex;

m_num_readers = 0;
m_num_writers = 0;
m_num_want_write = 0;
m_num_want_read = 0;
m_num_signaled_readers = 0;
m_num_expensive_want_write = 0;

toku_cond_init(&m_wait_read, nullptr);
m_queue_item_read.cond = &m_wait_read;
m_queue_item_read.next = nullptr;
m_wait_read_is_in_queue = false;
m_current_writer_expensive = false;
m_read_wait_expensive = false;

m_current_writer_tid = -1;
m_blocking_writer_context_id = CTX_INVALID;

m_wait_head = nullptr;
m_wait_tail = nullptr;
}

void frwlock::deinit(void) {
toku_cond_destroy(&m_wait_read);
}

bool frwlock::queue_is_empty(void) const {
return m_wait_head == nullptr;
}

void frwlock::enq_item(queue_item *const item) {
paranoid_invariant_null(item->next);
if (m_wait_tail != nullptr) {
m_wait_tail->next = item;
} else {
paranoid_invariant_null(m_wait_head);
m_wait_head = item;
}
m_wait_tail = item;
}

toku_cond_t *frwlock::deq_item(void) {
paranoid_invariant_notnull(m_wait_head);
paranoid_invariant_notnull(m_wait_tail);
queue_item *item = m_wait_head;
m_wait_head = m_wait_head->next;
if (m_wait_tail == item) {
m_wait_tail = nullptr;
}
return item->cond;
assert(m_queue->empty());
delete m_queue;
}

// Prerequisite: Holds m_mutex.
Expand All @@ -112,32 +75,25 @@ void frwlock::write_lock(bool expensive) {
}

toku_cond_t cond = TOKU_COND_INITIALIZER;
queue_item item = { .cond = &cond, .next = nullptr };
this->enq_item(&item);

// Wait for our turn.
++m_num_want_write;
m_queue->push(frwlock_queueitem(&cond,
get_local_tid(),
toku_thread_get_context()->get_id()));
if (expensive) {
++m_num_expensive_want_write;
}
if (m_num_writers == 0 && m_num_want_write == 1) {
// We are the first to want a write lock. No new readers can get the lock.
// Set our thread id and context for proper instrumentation.
// see: toku_context_note_frwlock_contention()
m_current_writer_tid = get_local_tid();
m_blocking_writer_context_id = toku_thread_get_context()->get_id();

while (m_num_writers || m_num_readers || m_queue->front().m_cond != &cond) {
// Wait until this cond variable is woken up. We could get a spurious wakeup.
toku_cond_wait(&cond, m_mutex);
}
toku_cond_wait(&cond, m_mutex);
m_queue->pop();
toku_cond_destroy(&cond);

// Now it's our turn.
paranoid_invariant(m_num_want_write > 0);
paranoid_invariant_zero(m_num_readers);
paranoid_invariant_zero(m_num_writers);
paranoid_invariant_zero(m_num_signaled_readers);

// Not waiting anymore; grab the lock.
--m_num_want_write;
if (expensive) {
--m_num_expensive_want_write;
}
Expand All @@ -149,12 +105,10 @@ void frwlock::write_lock(bool expensive) {

bool frwlock::try_write_lock(bool expensive) {
toku_mutex_assert_locked(m_mutex);
if (m_num_readers > 0 || m_num_writers > 0 || m_num_signaled_readers > 0 || m_num_want_write > 0) {
if (m_num_readers > 0 || m_num_writers > 0 || !m_queue->empty()) {
return false;
}
// No one holds the lock. Grant the write lock.
paranoid_invariant_zero(m_num_want_write);
paranoid_invariant_zero(m_num_want_read);
m_num_writers = 1;
m_current_writer_expensive = expensive;
m_current_writer_tid = get_local_tid();
Expand All @@ -164,45 +118,38 @@ bool frwlock::try_write_lock(bool expensive) {

void frwlock::read_lock(void) {
toku_mutex_assert_locked(m_mutex);
if (m_num_writers > 0 || m_num_want_write > 0) {
if (!m_wait_read_is_in_queue) {
// Throw the read cond_t onto the queue.
paranoid_invariant(m_num_signaled_readers == m_num_want_read);
m_queue_item_read.next = nullptr;
this->enq_item(&m_queue_item_read);
m_wait_read_is_in_queue = true;
paranoid_invariant(!m_read_wait_expensive);
m_read_wait_expensive = (
m_current_writer_expensive ||
(m_num_expensive_want_write > 0)
);
}

// Note this contention event in engine status.
if (this->try_read_lock()) return;

toku_cond_t cond = TOKU_COND_INITIALIZER;
m_queue->push(frwlock_queueitem(&cond));
while (m_num_writers || m_queue->front().m_cond != &cond) {
// We know the queue isn't empty (since we are in it), so it's
// safe to all m_queue->front().
toku_context_note_frwlock_contention(
toku_thread_get_context()->get_id(),
m_blocking_writer_context_id
);

// Wait for our turn.
++m_num_want_read;
toku_cond_wait(&m_wait_read, m_mutex);

// Now it's our turn.
paranoid_invariant_zero(m_num_writers);
paranoid_invariant(m_num_want_read > 0);
paranoid_invariant(m_num_signaled_readers > 0);

// Not waiting anymore; grab the lock.
--m_num_want_read;
--m_num_signaled_readers;
m_blocking_writer_context_id);
toku_cond_wait(&cond, m_mutex);
}
m_queue->pop();
toku_cond_destroy(&cond);
paranoid_invariant_zero(m_num_writers);
++m_num_readers;
if (!m_queue->empty()) {
const frwlock_queueitem &qi = m_queue->front();
if (qi.m_is_read) {
// The next guy is a reader, so wake him up too.
toku_cond_signal(qi.m_cond);
} else {
// The next guy is a writer, so he's the one whose context should be put.
m_current_writer_tid = qi.m_writer_tid;
m_blocking_writer_context_id = qi.m_writer_context_id;
}
}
}

bool frwlock::try_read_lock(void) {
toku_mutex_assert_locked(m_mutex);
if (m_num_writers > 0 || m_num_want_write > 0) {
if (m_num_writers > 0 || !m_queue->empty()) {
return false;
}
// No writer holds the lock.
Expand All @@ -212,58 +159,19 @@ bool frwlock::try_read_lock(void) {
return true;
}

void frwlock::maybe_signal_next_writer(void) {
if (m_num_want_write > 0 && m_num_signaled_readers == 0 && m_num_readers == 0) {
toku_cond_t *cond = this->deq_item();
paranoid_invariant(cond != &m_wait_read);
// Grant write lock to waiting writer.
paranoid_invariant(m_num_want_write > 0);
toku_cond_signal(cond);
}
}

void frwlock::read_unlock(void) {
toku_mutex_assert_locked(m_mutex);
paranoid_invariant(m_num_writers == 0);
paranoid_invariant(m_num_readers > 0);
--m_num_readers;
this->maybe_signal_next_writer();
if (m_num_readers == 0 && !m_queue->empty()) {
toku_cond_signal(m_queue->front().m_cond);
}
}

bool frwlock::read_lock_is_expensive(void) {
toku_mutex_assert_locked(m_mutex);
if (m_wait_read_is_in_queue) {
return m_read_wait_expensive;
}
else {
return m_current_writer_expensive || (m_num_expensive_want_write > 0);
}
}


void frwlock::maybe_signal_or_broadcast_next(void) {
paranoid_invariant(m_num_signaled_readers == 0);

if (this->queue_is_empty()) {
paranoid_invariant(m_num_want_write == 0);
paranoid_invariant(m_num_want_read == 0);
return;
}
toku_cond_t *cond = this->deq_item();
if (cond == &m_wait_read) {
// Grant read locks to all waiting readers
paranoid_invariant(m_wait_read_is_in_queue);
paranoid_invariant(m_num_want_read > 0);
m_num_signaled_readers = m_num_want_read;
m_wait_read_is_in_queue = false;
m_read_wait_expensive = false;
toku_cond_broadcast(cond);
}
else {
// Grant write lock to waiting writer.
paranoid_invariant(m_num_want_write > 0);
toku_cond_signal(cond);
}
return m_num_expensive_want_write > 0 || m_current_writer_expensive;
}

void frwlock::write_unlock(void) {
Expand All @@ -273,39 +181,35 @@ void frwlock::write_unlock(void) {
m_current_writer_expensive = false;
m_current_writer_tid = -1;
m_blocking_writer_context_id = CTX_INVALID;
this->maybe_signal_or_broadcast_next();
if (!m_queue->empty()) {
toku_cond_signal(m_queue->front().m_cond);
}
}
bool frwlock::write_lock_is_expensive(void) {
toku_mutex_assert_locked(m_mutex);
return (m_num_expensive_want_write > 0) || (m_current_writer_expensive);
return (m_num_expensive_want_write > 0) || m_current_writer_expensive;
}


uint32_t frwlock::users(void) const {
toku_mutex_assert_locked(m_mutex);
return m_num_readers + m_num_writers + m_num_want_read + m_num_want_write;
}
uint32_t frwlock::blocked_users(void) const {
toku_mutex_assert_locked(m_mutex);
return m_num_want_read + m_num_want_write;
return m_num_readers + m_num_writers + m_queue->size();
}

uint32_t frwlock::writers(void) const {
// this is sometimes called as "assert(lock->writers())" when we
// The following comment was found, and if true means the code is
// incorrect (we would at least have to make m_num_writers be an
// atomic variable). Unfortunately, such code actually exists, so I made the m_num_writer be atomic.

// This is sometimes called as "assert(lock->writers())" when we
// assume we have the write lock. if that's the assumption, we may
// not own the mutex, so we don't assert_locked here
//
return m_num_writers;
}
uint32_t frwlock::blocked_writers(void) const {
toku_mutex_assert_locked(m_mutex);
return m_num_want_write;
}

uint32_t frwlock::readers(void) const {
toku_mutex_assert_locked(m_mutex);
return m_num_readers;
}
uint32_t frwlock::blocked_readers(void) const {
toku_mutex_assert_locked(m_mutex);
return m_num_want_read;
}

} // namespace toku
Loading