Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 14 additions & 49 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,55 +1,20 @@
cmake_minimum_required(VERSION 2.8)
project(libco)
cmake_minimum_required(VERSION 2.8.12)

# This for mac osx only
set(CMAKE_MACOSX_RPATH 0)
message(STATUS "CURRENT_DIR: ${CMAKE_CURRENT_LIST_DIR}")

# Set lib version
set(LIBCO_VERSION 0.5)
#set (CMAKE_CXX_FLAGS "-std=c++11 -pthread")
#set (CMAKE_CXX_STANDARD 11)

# Set cflags
set(CMAKE_C_FLAGS ${CMAKE_C_FLAGS} -g -fno-strict-aliasing -O2 -Wall -export-dynamic -Wall -pipe -D_GNU_SOURCE -D_REENTRANT -fPIC -Wno-deprecated -m64)
enable_language(C CXX ASM)

# Use c and asm
enable_language(C ASM)
message(STATUS "CMAKE_CXX_FLAGS: ${CMAKE_CXX_FLAGS}")
message(STATUS "CMAKE_CXX_STANDARD: ${CMAKE_CXX_STANDARD}")

# Add source files
set(SOURCE_FILES
co_epoll.cpp
co_hook_sys_call.cpp
co_routine.cpp
coctx.cpp
coctx_swap.S)
# 查找当前目录下的所有源文件
# 并将名称保存到 DIR_LIB_SRCS 变量
aux_source_directory(. DIR_LIB_SRCS)
set(DIR_LIB_SRCS ${DIR_LIB_SRCS} coctx_swap.S)
# 生成
#add_executable(DSAdmin ${DIR_LIB_SRCS})
add_library (libco STATIC ${DIR_LIB_SRCS})

# Add static and shared library target
add_library(colib_static STATIC ${SOURCE_FILES})
add_library(colib_shared SHARED ${SOURCE_FILES})

# Set library output name
set_target_properties(colib_static PROPERTIES OUTPUT_NAME colib)
set_target_properties(colib_shared PROPERTIES OUTPUT_NAME colib)

set_target_properties(colib_static PROPERTIES CLEAN_DIRECT_OUTPUT 1)
set_target_properties(colib_shared PROPERTIES CLEAN_DIRECT_OUTPUT 1)

# Set shared library version, will generate libcolib.${LIBCO_VERSION}.so and a symbol link named libcolib.so
# For mac osx, the extension name will be .dylib
set_target_properties(colib_shared PROPERTIES VERSION ${LIBCO_VERSION} SOVERSION ${LIBCO_VERSION})



# Macro for add example target
macro(add_example_target EXAMPLE_TARGET)
add_executable("example_${EXAMPLE_TARGET}" "example_${EXAMPLE_TARGET}.cpp")
target_link_libraries("example_${EXAMPLE_TARGET}" colib_static pthread dl)
endmacro(add_example_target)

add_example_target(closure)
add_example_target(cond)
add_example_target(copystack)
add_example_target(echocli)
add_example_target(echosvr)
add_example_target(poll)
add_example_target(setenv)
add_example_target(specific)
add_example_target(thread)
48 changes: 48 additions & 0 deletions co_bt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import gdb

class CoroutinesCallStack(gdb.Command):
def __init__(self):
super(CoroutinesCallStack, self).__init__("co_bt", gdb.COMMAND_STACK)

def invoke(self, arg, from_tty):
coctx_array_type = gdb.lookup_type("void").pointer().pointer()
coctx_array = gdb.parse_and_eval("coctx_array").cast(coctx_array_type)

coctx_array_length = int(gdb.parse_and_eval("coctx_array_length"))
print("Coroutine count: %d" % coctx_array_length)

print("print callstacks...\n")
max_depth = 24
for idx in range(coctx_array_length):
coctx_ptr = coctx_array[idx].cast(gdb.lookup_type("void").pointer())
coctx_addr = int(coctx_ptr.dereference())

rbp = int(gdb.Value(coctx_addr + 48).cast(gdb.lookup_type("long").pointer()))
rsp = int(gdb.Value(coctx_addr + 104).cast(gdb.lookup_type("long").pointer()))
rip = int(gdb.Value(coctx_addr + 72).cast(gdb.lookup_type("long").pointer()))
rbx = int(gdb.Value(coctx_addr + 96).cast(gdb.lookup_type("long").pointer()))

print("Coroutine %d (context address: 0x%x):" % (idx, coctx_addr))
for i in range(max_depth):
# 读取当前栈帧的调用地址和栈帧指针
call_addr = int(gdb.Value(rbp + 8).cast(gdb.lookup_type("long").pointer()))
frame_ptr = int(gdb.Value(rbp).cast(gdb.lookup_type("long").pointer()))

# 如果调用地址为0,说明已经到达调用栈底部
if call_addr == 0:
break

# 打印当前栈帧的信息
print(" frame %d: 0x%x" % (i, call_addr))

# 更新栈帧指针和调用地址,继续遍历调用栈
rbp = frame_ptr
rip = call_addr

# 如果遍历到了最大深度,结束遍历
if i == max_depth - 1:
print(" ...")
break
print("\n")

CoroutinesCallStack()
7 changes: 5 additions & 2 deletions co_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <string.h>

#if !defined( __APPLE__ ) && !defined( __FreeBSD__ )

#include <unistd.h>
int co_epoll_wait( int epfd,struct co_epoll_res *events,int maxevents,int timeout )
{
return epoll_wait( epfd,events->events,maxevents,timeout );
Expand All @@ -36,7 +36,10 @@ int co_epoll_create( int size )
{
return epoll_create( size );
}

int co_epoll_close( int fd )
{
return close(fd);
}
struct co_epoll_res *co_epoll_res_alloc( int n )
{
struct co_epoll_res * ptr =
Expand Down
1 change: 1 addition & 0 deletions co_epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct co_epoll_res
int co_epoll_wait( int epfd,struct co_epoll_res *events,int maxevents,int timeout );
int co_epoll_ctl( int epfd,int op,int fd,struct epoll_event * );
int co_epoll_create( int size );
int co_epoll_close( int fd );
struct co_epoll_res *co_epoll_res_alloc( int n );
void co_epoll_res_free( struct co_epoll_res * );

Expand Down
115 changes: 109 additions & 6 deletions co_routine.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* Tencent is pleased to support the open source community by making Libco available.

* Copyright (C) 2014 THL A29 Limited, a Tencent company. All rights reserved.
Expand Down Expand Up @@ -39,6 +39,7 @@
#include <sys/syscall.h>
#include <unistd.h>
#include <limits.h>
#include <bitset>

extern "C"
{
Expand Down Expand Up @@ -518,6 +519,49 @@ struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAt
return lp;
}

#define DEBUG_ALL_COCTX
#ifdef DEBUG_ALL_COCTX
coctx_t** coctx_array = nullptr;
size_t coctx_array_length = 0;
size_t coctx_array_capacity = 0;
void new_coctx_array(size_t new_capacity)
{
coctx_t** new_array = (coctx_t**)malloc(new_capacity * sizeof(coctx_t*));
memcpy(new_array, coctx_array, coctx_array_length * sizeof(coctx_t*));
free(coctx_array);
coctx_array = new_array;
coctx_array_capacity = new_capacity;
}

void add_coctx(coctx_t* coctx)
{
if (coctx_array_length >= coctx_array_capacity)
{
size_t more_capacity = coctx_array_capacity ? coctx_array_capacity * 2 : 1;
new_coctx_array(more_capacity);
}
coctx_array[coctx_array_length++] = coctx;
}

void remove_coctx(coctx_t* coctx)
{
for (size_t i = 0; i < coctx_array_length; i++)
{
if (coctx_array[i] == coctx)
{
coctx_array_length--;
memcpy(&coctx_array[i], &coctx_array[i+1], (coctx_array_length - i) * sizeof(coctx_t*));
if (coctx_array_length < coctx_array_capacity/2)
{
size_t less_capacity = coctx_array_capacity/2;
new_coctx_array(less_capacity);
}
return;
}
}
}
#endif

int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )
Expand All @@ -526,10 +570,16 @@ int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
#ifdef DEBUG_ALL_COCTX
add_coctx(&co->ctx);
#endif
return 0;
}
void co_free( stCoRoutine_t *co )
{
#ifdef DEBUG_ALL_COCTX
remove_coctx(&co->ctx);
#endif
if (!co->cIsShareStack)
{
free(co->stack_mem->stack_buffer);
Expand All @@ -545,6 +595,7 @@ void co_free( stCoRoutine_t *co )
if(co->stack_mem->occupy_co == co)
co->stack_mem->occupy_co = NULL;
}
co->pfn = NULL; //joezzhu fix the memory leak bug at 2022-03-09

free( co );
}
Expand Down Expand Up @@ -741,8 +792,8 @@ static __thread stCoRoutineEnv_t* gCoEnvPerThread = NULL;

void co_init_curr_thread_env()
{
gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
stCoRoutineEnv_t *env = gCoEnvPerThread;
stCoRoutineEnv_t *env = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
//stCoRoutineEnv_t *env = gCoEnvPerThread;

env->iCallStackSize = 0;
struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
Expand All @@ -757,6 +808,8 @@ void co_init_curr_thread_env()

stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );

gCoEnvPerThread = env;
}
stCoRoutineEnv_t *co_get_curr_thread_env()
{
Expand Down Expand Up @@ -896,10 +949,26 @@ void FreeEpoll( stCoEpoll_t *ctx )
free( ctx->pstTimeoutList );
FreeTimeout( ctx->pTimeout );
co_epoll_res_free( ctx->result );
co_epoll_close( ctx->iEpollFd );
}
free( ctx );
}

void RecreateEpollFd( stCoRoutineEnv_t *env )
{
if (env->pEpoll)
{
stCoEpoll_t *ctx = env->pEpoll;
co_epoll_close( ctx->iEpollFd );
ctx->iEpollFd = co_epoll_create( stCoEpoll_t::_EPOLL_SIZE );
}
else
{
stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );
}
}

stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env )
{
return env->pCallStack[ env->iCallStackSize - 1 ];
Expand Down Expand Up @@ -1054,6 +1123,37 @@ struct stHookPThreadSpec_t
size = 1024
};
};
static std::bitset<1024> specific_mask;
int co_create_specific( pthread_key_t* key)
{
if (key)
{
stCoRoutine_t *co = GetCurrThreadCo();
if (!co || co->cIsMain) {
return pthread_key_create(key, NULL);
}
for (auto i = 0; i < specific_mask.size(); i++) {
if (!specific_mask.test(i)) {
specific_mask.set(i);
*key = i;
return 0;
}
}
*key = -1;
}
return -1;
}
int co_delete_specific( pthread_key_t key )
{
stCoRoutine_t *co = GetCurrThreadCo();
if (!co || co->cIsMain) {
return pthread_key_delete(key);
}
assert(0 <= key && key < 1024);
assert(specific_mask.test(key));
specific_mask.reset(key);
return 0;
}
void *co_getspecific(pthread_key_t key)
{
stCoRoutine_t *co = GetCurrThreadCo();
Expand Down Expand Up @@ -1113,6 +1213,7 @@ struct stCoCond_t
static void OnSignalProcessEvent( stTimeoutItem_t * ap )
{
stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
co->cCondTimeout = ap->bTimeout ? 1 : 0;
co_resume( co );
}

Expand Down Expand Up @@ -1148,8 +1249,10 @@ int co_cond_broadcast( stCoCond_t *si )

int co_cond_timedwait( stCoCond_t *link,int ms )
{
stCoCondItem_t* psi = (stCoCondItem_t*)calloc(1, sizeof(stCoCondItem_t));
psi->timeout.pArg = GetCurrThreadCo();
stCoRoutine_t* co = GetCurrThreadCo();
co->cCondTimeout = 0;
stCoCondItem_t* psi = (stCoCondItem_t*)calloc(1, sizeof(stCoCondItem_t));
psi->timeout.pArg = co; //GetCurrThreadCo();
psi->timeout.pfnProcess = OnSignalProcessEvent;

if( ms > 0 )
Expand All @@ -1172,7 +1275,7 @@ int co_cond_timedwait( stCoCond_t *link,int ms )
RemoveFromLink<stCoCondItem_t,stCoCond_t>( psi );
free(psi);

return 0;
return co->cCondTimeout; // 0;
}
stCoCond_t *co_cond_alloc()
{
Expand Down
8 changes: 6 additions & 2 deletions co_routine.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <stdint.h>
#include <sys/poll.h>
#include <pthread.h>
#include <functional>

//1.struct

Expand All @@ -41,11 +42,12 @@ struct stCoRoutineAttr_t

struct stCoEpoll_t;
typedef int (*pfn_co_eventloop_t)(void *);
typedef void *(*pfn_co_routine_t)( void * );
//typedef void *(*pfn_co_routine_t)( void * );
typedef std::function<void* (void*)> pfn_co_routine_t;

//2.co_routine

int co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr,void *(*routine)(void*),void *arg );
int co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr, pfn_co_routine_t pfn,void *arg );
void co_resume( stCoRoutine_t *co );
void co_yield( stCoRoutine_t *co );
void co_yield_ct(); //ct = current thread
Expand All @@ -59,6 +61,8 @@ void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg );

//3.specific

int co_create_specific( pthread_key_t* key );
int co_delete_specific( pthread_key_t key );
int co_setspecific( pthread_key_t key, const void *value );
void * co_getspecific( pthread_key_t key );

Expand Down
2 changes: 2 additions & 0 deletions co_routine_inner.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct stCoRoutine_t
char cIsMain;
char cEnableSysHook;
char cIsShareStack;
char cCondTimeout;

void *pvEnv;

Expand Down Expand Up @@ -103,6 +104,7 @@ void FreeEpoll( stCoEpoll_t *ctx );

stCoRoutine_t * GetCurrThreadCo();
void SetEpoll( stCoRoutineEnv_t *env,stCoEpoll_t *ev );
void RecreateEpollFd( stCoRoutineEnv_t *env );

typedef void (*pfnCoRoutineFunc_t)();

Expand Down
Loading