Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ enable_language(C ASM)

# Add source files
set(SOURCE_FILES
co_comm.cpp
co_epoll.cpp
co_hook_sys_call.cpp
co_routine.cpp
Expand Down
83 changes: 0 additions & 83 deletions co_routine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,6 @@ void inline Join( TLink*apLink,TLink *apOther )
apOther->head = apOther->tail = NULL;
}

/////////////////for copy stack //////////////////////////
stStackMem_t* co_alloc_stackmem(unsigned int stack_size)
{
stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t));
stack_mem->occupy_co= NULL;
stack_mem->stack_size = stack_size;
stack_mem->stack_buffer = (char*)malloc(stack_size);
stack_mem->stack_bp = stack_mem->stack_buffer + stack_size;
return stack_mem;
}

stShareStack_t* co_alloc_sharestack(int count, int stack_size)
{
Expand All @@ -293,17 +283,6 @@ stShareStack_t* co_alloc_sharestack(int count, int stack_size)
return share_stack;
}

static stStackMem_t* co_get_stackmem(stShareStack_t* share_stack)
{
if (!share_stack)
{
return NULL;
}
int idx = share_stack->alloc_idx % share_stack->count;
share_stack->alloc_idx++;

return share_stack->stack_array[idx];
}


// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -456,68 +435,6 @@ static int CoRoutineFunc( stCoRoutine_t *co,void * )
return 0;
}



struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
pfn_co_routine_t pfn,void *arg )
{

stCoRoutineAttr_t at;
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024;
}
else if( at.stack_size > 1024 * 1024 * 8 )
{
at.stack_size = 1024 * 1024 * 8;
}

if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}

stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );

memset( lp,0,(long)(sizeof(stCoRoutine_t)));


lp->env = env;
lp->pfn = pfn;
lp->arg = arg;

stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;

lp->ctx.ss_sp = stack_mem->stack_buffer;
lp->ctx.ss_size = at.stack_size;

lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0;
lp->cIsShareStack = at.share_stack != NULL;

lp->save_size = 0;
lp->save_buffer = NULL;

return lp;
}

int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )
Expand Down
122 changes: 121 additions & 1 deletion co_routine.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include <stdint.h>
#include <sys/poll.h>
#include <pthread.h>

#include <string.h>
#include <tuple>
#include <memory>
//1.struct

struct stCoRoutine_t;
Expand All @@ -43,9 +45,127 @@ struct stCoEpoll_t;
typedef int (*pfn_co_eventloop_t)(void *);
typedef void *(*pfn_co_routine_t)( void * );

#include "co_routine_inner.h"

//2.co_routine

static inline stStackMem_t* co_get_stackmem(stShareStack_t* share_stack)
{
if (!share_stack)
{
return NULL;
}
int idx = share_stack->alloc_idx % share_stack->count;
share_stack->alloc_idx++;

return share_stack->stack_array[idx];
}

/////////////////for copy stack //////////////////////////
inline stStackMem_t* co_alloc_stackmem(unsigned int stack_size)
{
stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t));
stack_mem->occupy_co= NULL;
stack_mem->stack_size = stack_size;
stack_mem->stack_buffer = (char*)malloc(stack_size);
stack_mem->stack_bp = stack_mem->stack_buffer + stack_size;
return stack_mem;
}

inline struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
pfn_co_routine_t pfn,void *arg )
{

stCoRoutineAttr_t at;
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024;
}
else if( at.stack_size > 1024 * 1024 * 8 )
{
at.stack_size = 1024 * 1024 * 8;
}

if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}

stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );

memset( lp,0,(long)(sizeof(stCoRoutine_t)));


lp->env = env;
lp->pfn = pfn;
lp->arg = arg;

stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;

lp->ctx.ss_sp = stack_mem->stack_buffer;
lp->ctx.ss_size = at.stack_size;

lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0;
lp->cIsShareStack = at.share_stack != NULL;

lp->save_size = 0;
lp->save_buffer = NULL;

return lp;
}

int co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr,void *(*routine)(void*),void *arg );


// microcai 添加
template <typename... Args>
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,void *(*pfn)(Args...), Args... args)
{

if( !co_get_curr_thread_env() )
{
co_init_curr_thread_env();
}

struct new_func_arg_pack {
decltype(pfn) real_func_ptr;
std::tuple<Args...> args;
};

auto new_arg = new new_func_arg_pack { pfn, std::tuple<Args...>{std::forward<Args>(args)...} };

auto wrapper_func = [](void* arg_pack)
{
auto converted_arg_pack =(new_func_arg_pack*)(arg_pack);

std::unique_ptr<new_func_arg_pack> auto_delete(converted_arg_pack);

return std::apply(converted_arg_pack->real_func_ptr, std::move(converted_arg_pack->args));
};

stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, wrapper_func, new_arg );
*ppco = co;
return 0;
}

void co_resume( stCoRoutine_t *co );
void co_yield( stCoRoutine_t *co );
void co_yield_ct(); //ct = current thread
Expand Down
6 changes: 2 additions & 4 deletions example_cond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ struct stEnv_t
stCoCond_t* cond;
queue<stTask_t*> task_queue;
};
void* Producer(void* args)
void* Producer(stEnv_t* env)
{
co_enable_hook_sys();
stEnv_t* env= (stEnv_t*)args;
int id = 0;
while (true)
{
Expand All @@ -47,10 +46,9 @@ void* Producer(void* args)
}
return NULL;
}
void* Consumer(void* args)
void* Consumer(stEnv_t* env)
{
co_enable_hook_sys();
stEnv_t* env = (stEnv_t*)args;
while (true)
{
if (env->task_queue.empty())
Expand Down
7 changes: 3 additions & 4 deletions example_copystack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
#include "co_routine.h"
#include "co_routine_inner.h"

void* RoutineFunc(void* args)
void* RoutineFunc(int routineid)
{
co_enable_hook_sys();
int* routineid = (int*)args;
while (true)
{
char sBuff[128];
sprintf(sBuff, "from routineid %d stack addr %p\n", *routineid, sBuff);
sprintf(sBuff, "from routineid %d stack addr %p\n", routineid, sBuff);

printf("%s", sBuff);
poll(NULL, 0, 1000); //sleep 1s
Expand All @@ -53,7 +52,7 @@ int main()
for (int i = 0; i < 2; i++)
{
routineid[i] = i;
co_create(&co[i], &attr, RoutineFunc, routineid + i);
co_create(&co[i], &attr, RoutineFunc, routineid[i]);
co_resume(co[i]);
}
co_eventloop(co_get_epoll_ct(), NULL, NULL);
Expand Down
7 changes: 3 additions & 4 deletions example_echocli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,11 @@ void AddFailCnt()
}
}

static void *readwrite_routine( void *arg )
static void *readwrite_routine( stEndPoint endpoint )
{

co_enable_hook_sys();

stEndPoint *endpoint = (stEndPoint *)arg;
char str[8]="sarlmol";
char buf[ 1024 * 16 ];
int fd = -1;
Expand All @@ -114,7 +113,7 @@ static void *readwrite_routine( void *arg )
{
fd = socket(PF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
SetAddr(endpoint->ip, endpoint->port, addr);
SetAddr(endpoint.ip, endpoint.port, addr);
ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr));

if ( errno == EALREADY || errno == EINPROGRESS )
Expand Down Expand Up @@ -206,7 +205,7 @@ int main(int argc,char *argv[])
for(int i=0;i<cnt;i++)
{
stCoRoutine_t *co = 0;
co_create( &co,NULL,readwrite_routine, &endpoint);
co_create( &co,NULL,readwrite_routine, endpoint);
co_resume( co );
}
co_eventloop( co_get_epoll_ct(),0,0 );
Expand Down
Loading