Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/output
/test/output
build/
.cache

# Ignore hidden files
.*
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ You can use it to:
* [bthread or not](docs/cn/bthread_or_not.md)
* [thread-local](docs/cn/thread_local.md)
* [Execution Queue](docs/cn/execution_queue.md)
* [Active Task (experimental)](docs/cn/bthread_active_task.md)
* Client
* [Basics](docs/en/client.md)
* [Error code](docs/en/error_code.md)
Expand Down
1 change: 1 addition & 0 deletions README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [bthread or not](docs/cn/bthread_or_not.md)
* [thread-local](docs/cn/thread_local.md)
* [Execution Queue](docs/cn/execution_queue.md)
* [Active Task(实验性)](docs/cn/bthread_active_task.md)
* [bthread tracer](docs/cn/bthread_tracer.md)
* Client
* [基础功能](docs/cn/client.md)
Expand Down
386 changes: 386 additions & 0 deletions docs/cn/bthread_active_task.md

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions docs/cn/threading_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,28 @@
异步编程中的流程控制对于专家也充满了陷阱。任何挂起操作,如sleep一会儿或等待某事完成,都意味着用户需要显式地保存状态,并在回调函数中恢复状态。异步代码往往得写成状态机的形式。当挂起较少时,这有点麻烦,但还是可把握的。问题在于一旦挂起发生在条件判断、循环、子函数中,写出这样的状态机并能被很多人理解和维护,几乎是不可能的,而这在分布式系统中又很常见,因为一个节点往往要与多个节点同时交互。另外如果唤醒可由多种事件触发(比如fd有数据或超时了),挂起和恢复的过程容易出现race condition,对多线程编码能力要求很高。语法糖(比如lambda)可以让编码不那么“麻烦”,但无法降低难度。

共享指针在异步编程中很普遍,这看似方便,但也使内存的ownership变得难以捉摸,如果内存泄漏了,很难定位哪里没有释放;如果segment fault了,也不知道哪里多释放了一下。大量使用引用计数的用户代码很难控制代码质量,容易长期在内存问题上耗费时间。如果引用计数还需要手动维护,保持质量就更难了,维护者也不会愿意改进。没有上下文会使得[RAII](http://en.wikipedia.org/wiki/Resource_Acquisition_Is_Initialization)无法充分发挥作用, 有时需要在callback之外lock,callback之内unlock,实践中很容易出错。

## butex wait/wake 顺序规则(实用)

直接使用 `butex_wait`/`butex_wake*` 时,务必遵守:

1. 唤醒方先写结果/状态,再调用 `butex_wake*`。
2. 等待方在每次 `butex_wait` 返回后都要重检谓词条件。

`butex_wait` 返回 `0` 只表示“从 butex 等待队列被唤醒”,不代表“业务条件已经满足”。

常见写法:

```cpp
// 唤醒方
state.store(new_value, butil::memory_order_release);
bthread::butex_wake(&state);

// 等待方
while (state.load(butil::memory_order_acquire) == expected_value) {
if (bthread::butex_wait(&state, expected_value, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
// 处理超时/中断/停止等错误
}
}
```
25 changes: 25 additions & 0 deletions docs/en/threading_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,28 @@ When an event dispatcher passes a task to a worker thread, the user code probabl
Flow controls in asynchronous programming are even difficult for experts. Any suspending operation such as sleeping for a while or waiting for something to finish, implies that users have to save states explicitly and restore states in callbacks. Asynchronous code is often written as state machines. A few suspensions are troublesome, but still handleable. The problem is that once the suspension occurs inside a condition, loop or sub-function, it's almost impossible to write such a state machine being understood and maintained by many people, although the scenario is quite common in distributed systems where a node often needs to interact with multiple nodes simultaneously. In addition, if the wakeup can be triggered by more than one events (such as either fd has data or timeout is reached), the suspension and resuming are prone to race conditions, which require good multi-threaded programming skills to solve. Syntactic sugars(such as lambda) just make coding less troublesome rather than reducing difficulty.

Shared pointers are common in asynchronous programming, which seems convenient, but also makes ownerships of memory elusive. If the memory is leaked, it's difficult to locate the code that forgot to release; if segment fault happens, where the double-free occurs is also unknown. Code with a lot of referential countings is hard to remain good-quality and may waste a lot of time on debugging memory related issues. If references are even counted manually, keeping quality of the code is harder and the maintainers are less willing to modify the code. [RAII](http://en.wikipedia.org/wiki/Resource_Acquisition_Is_Initialization) cannot be used in many scenarios in asynchronous programming, sometimes resources need to be locked before a callback and unlocked inside the callback, which is very error-prone in practice.

## Butex wait/wake ordering (practical rule)

When using `butex_wait`/`butex_wake*` directly, follow this rule strictly:

1. Waker writes result/state first, then calls `butex_wake*`.
2. Waiter always re-checks predicate after every `butex_wait` return.

`butex_wait` returning `0` only means "woken from butex queue", not "predicate is true".

Typical pattern:

```cpp
// waker
state.store(new_value, butil::memory_order_release);
bthread::butex_wake(&state);

// waiter
while (state.load(butil::memory_order_acquire) == expected_value) {
if (bthread::butex_wait(&state, expected_value, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
// handle timeout/stop/etc.
}
}
```
142 changes: 142 additions & 0 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
// Date: Tue Jul 10 17:40:58 CST 2012

#include <sys/syscall.h>
#include <limits>
#include <string.h>
#include <vector>
#include <gflags/gflags.h>
#include "butil/macros.h" // BAIDU_CASSERT
#include "butil/logging.h"
Expand Down Expand Up @@ -85,6 +88,8 @@ pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
// Notice that we can't declare the variable as atomic<TaskControl*> which
// are not constructed before main().
TaskControl* g_task_control = NULL;
static pthread_mutex_t g_active_task_registry_mutex = PTHREAD_MUTEX_INITIALIZER;
static std::vector<bthread_active_task_type_t> g_active_task_types;

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
Expand All @@ -96,6 +101,89 @@ inline TaskControl* get_task_control() {
return g_task_control;
}

static bool normalize_active_task_type(const bthread_active_task_type_t* in,
bthread_active_task_type_t* out) {
if (in == NULL || out == NULL) {
return false;
}
if (in->struct_size < sizeof(bthread_active_task_type_t)) {
return false;
}
if (in->name == NULL || in->name[0] == '\0') {
return false;
}
if (in->worker_init == NULL && in->worker_destroy == NULL &&
in->harvest == NULL) {
return false;
}
memset(out, 0, sizeof(*out));
memcpy(out, in, sizeof(*out));
out->struct_size = sizeof(*out);
return true;
}

void get_active_task_types_snapshot(std::vector<bthread_active_task_type_t>* out) {
if (out == NULL) {
return;
}
BAIDU_SCOPED_LOCK(g_active_task_registry_mutex);
*out = g_active_task_types;
}

static inline TaskMeta* current_normal_bthread_for_local_pin(int* err) {
TaskGroup* g = tls_task_group;
if (g == NULL || g->is_current_main_task() || g->is_current_pthread_task()) {
if (err) {
*err = EPERM;
}
return NULL;
}
if (err) {
*err = 0;
}
return g->current_task();
}

static inline int enter_local_pin_scope(TaskMeta* m, TaskGroup* g) {
if (m == NULL || g == NULL) {
return EINVAL;
}
if (!m->local_pin_enabled) {
m->local_pin_home_group = g;
m->local_pin_home_control = g->control();
m->local_pin_home_tag = g->tag();
m->local_pin_depth = 1;
m->local_pin_enabled = true;
return 0;
}
if (m->local_pin_home_group != g ||
m->local_pin_home_control != g->control() ||
m->local_pin_home_tag != g->tag()) {
return EPERM;
}
if (m->local_pin_depth == std::numeric_limits<uint16_t>::max()) {
return EINVAL;
}
++m->local_pin_depth;
return 0;
}

static inline int leave_local_pin_scope(TaskMeta* m) {
if (m == NULL) {
return EINVAL;
}
if (!m->local_pin_enabled || m->local_pin_depth == 0) {
return EINVAL;
}
if (--m->local_pin_depth == 0) {
m->local_pin_enabled = false;
m->local_pin_home_group = NULL;
m->local_pin_home_control = NULL;
m->local_pin_home_tag = BTHREAD_TAG_INVALID;
}
return 0;
}

inline TaskControl* get_or_new_task_control() {
butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
TaskControl* c = p->load(butil::memory_order_consume);
Expand Down Expand Up @@ -146,6 +234,11 @@ bthread_t init_for_pthread_stack_trace() {
}

pthread_fake_meta->attr = BTHREAD_ATTR_PTHREAD;
pthread_fake_meta->local_pin_home_group = NULL;
pthread_fake_meta->local_pin_home_control = NULL;
pthread_fake_meta->local_pin_home_tag = BTHREAD_TAG_INVALID;
pthread_fake_meta->local_pin_depth = 0;
pthread_fake_meta->local_pin_enabled = false;
pthread_fake_meta->tid = make_tid(*pthread_fake_meta->version_butex, slot);
// Make TaskTracer use signal trace mode for pthread.
c->_task_tracer.set_running_status(syscall(SYS_gettid), pthread_fake_meta);
Expand Down Expand Up @@ -328,6 +421,55 @@ struct TidJoiner {

extern "C" {

int bthread_register_active_task_type(const bthread_active_task_type_t* type) {
bthread_active_task_type_t normalized;
if (!bthread::normalize_active_task_type(type, &normalized)) {
return EINVAL;
}
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
if (bthread::get_task_control() != NULL) {
return EPERM;
}
BAIDU_SCOPED_LOCK(bthread::g_active_task_registry_mutex);
bthread::g_active_task_types.push_back(normalized);
return 0;
}

int bthread_butex_wake_within(const bthread_active_task_ctx_t* ctx,
void* butex) {
return bthread::TaskGroup::butex_wake_within_active_task(ctx, butex);
}

int bthread_butex_wait_local(void* butex, int expected_value,
const struct timespec* abstime) {
if (butex == NULL) {
errno = EINVAL;
return -1;
}
int err = 0;
bthread::TaskMeta* m = bthread::current_normal_bthread_for_local_pin(&err);
if (m == NULL) {
errno = err;
return -1;
}
bthread::TaskGroup* g = bthread::tls_task_group;
err = bthread::enter_local_pin_scope(m, g);
if (err != 0) {
errno = err;
return -1;
}
const int rc = bthread::butex_wait(butex, expected_value, abstime);
const int saved_errno = errno;
const int leave_err = bthread::leave_local_pin_scope(m);
if (leave_err != 0) {
LOG(ERROR) << "Fail to leave local pin scope after bthread_butex_wait_local";
errno = leave_err;
return -1;
}
errno = saved_errno;
return rc;
}

int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
Expand Down
Loading