From 854bab24b2cc6ddecea696de0e397b2fa4bdc470 Mon Sep 17 00:00:00 2001 From: microcai Date: Mon, 16 Dec 2024 20:59:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=81=E8=AE=B8=20co=5Fcreate=20=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E4=BB=BB=E6=84=8F=E5=A4=9A=E5=8F=82=E6=95=B0=E7=9A=84?= =?UTF-8?q?=E5=85=A5=E5=8F=A3=E5=87=BD=E6=95=B0=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 1 + co_routine.cpp | 83 ---------------------------- co_routine.h | 122 +++++++++++++++++++++++++++++++++++++++++- example_cond.cpp | 6 +-- example_copystack.cpp | 7 ++- example_echocli.cpp | 7 ++- example_echosvr.cpp | 22 ++++---- example_poll.cpp | 10 ++-- example_setenv.cpp | 4 +- example_specific.cpp | 5 +- 10 files changed, 146 insertions(+), 121 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 04399a5..e1d2bc0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,7 @@ enable_language(C ASM) # Add source files set(SOURCE_FILES + co_comm.cpp co_epoll.cpp co_hook_sys_call.cpp co_routine.cpp diff --git a/co_routine.cpp b/co_routine.cpp index 352ae7e..9a29ff8 100644 --- a/co_routine.cpp +++ b/co_routine.cpp @@ -265,16 +265,6 @@ void inline Join( TLink*apLink,TLink *apOther ) apOther->head = apOther->tail = NULL; } -/////////////////for copy stack ////////////////////////// -stStackMem_t* co_alloc_stackmem(unsigned int stack_size) -{ - stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t)); - stack_mem->occupy_co= NULL; - stack_mem->stack_size = stack_size; - stack_mem->stack_buffer = (char*)malloc(stack_size); - stack_mem->stack_bp = stack_mem->stack_buffer + stack_size; - return stack_mem; -} stShareStack_t* co_alloc_sharestack(int count, int stack_size) { @@ -293,17 +283,6 @@ stShareStack_t* co_alloc_sharestack(int count, int stack_size) return share_stack; } -static stStackMem_t* co_get_stackmem(stShareStack_t* share_stack) -{ - if (!share_stack) - { - return NULL; - } - int idx = share_stack->alloc_idx % share_stack->count; - share_stack->alloc_idx++; - - return share_stack->stack_array[idx]; -} // ---------------------------------------------------------------------------- @@ -456,68 +435,6 @@ static int CoRoutineFunc( stCoRoutine_t *co,void * ) return 0; } - - -struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr, - pfn_co_routine_t pfn,void *arg ) -{ - - stCoRoutineAttr_t at; - if( attr ) - { - memcpy( &at,attr,sizeof(at) ); - } - if( at.stack_size <= 0 ) - { - at.stack_size = 128 * 1024; - } - else if( at.stack_size > 1024 * 1024 * 8 ) - { - at.stack_size = 1024 * 1024 * 8; - } - - if( at.stack_size & 0xFFF ) - { - at.stack_size &= ~0xFFF; - at.stack_size += 0x1000; - } - - stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) ); - - memset( lp,0,(long)(sizeof(stCoRoutine_t))); - - - lp->env = env; - lp->pfn = pfn; - lp->arg = arg; - - stStackMem_t* stack_mem = NULL; - if( at.share_stack ) - { - stack_mem = co_get_stackmem( at.share_stack); - at.stack_size = at.share_stack->stack_size; - } - else - { - stack_mem = co_alloc_stackmem(at.stack_size); - } - lp->stack_mem = stack_mem; - - lp->ctx.ss_sp = stack_mem->stack_buffer; - lp->ctx.ss_size = at.stack_size; - - lp->cStart = 0; - lp->cEnd = 0; - lp->cIsMain = 0; - lp->cEnableSysHook = 0; - lp->cIsShareStack = at.share_stack != NULL; - - lp->save_size = 0; - lp->save_buffer = NULL; - - return lp; -} - int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg ) { if( !co_get_curr_thread_env() ) diff --git a/co_routine.h b/co_routine.h index d6f4789..a1edf85 100644 --- a/co_routine.h +++ b/co_routine.h @@ -22,7 +22,9 @@ #include #include #include - +#include +#include +#include //1.struct struct stCoRoutine_t; @@ -43,9 +45,127 @@ struct stCoEpoll_t; typedef int (*pfn_co_eventloop_t)(void *); typedef void *(*pfn_co_routine_t)( void * ); +#include "co_routine_inner.h" + //2.co_routine +static inline stStackMem_t* co_get_stackmem(stShareStack_t* share_stack) +{ + if (!share_stack) + { + return NULL; + } + int idx = share_stack->alloc_idx % share_stack->count; + share_stack->alloc_idx++; + + return share_stack->stack_array[idx]; +} + +/////////////////for copy stack ////////////////////////// +inline stStackMem_t* co_alloc_stackmem(unsigned int stack_size) +{ + stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t)); + stack_mem->occupy_co= NULL; + stack_mem->stack_size = stack_size; + stack_mem->stack_buffer = (char*)malloc(stack_size); + stack_mem->stack_bp = stack_mem->stack_buffer + stack_size; + return stack_mem; +} + +inline struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr, + pfn_co_routine_t pfn,void *arg ) +{ + + stCoRoutineAttr_t at; + if( attr ) + { + memcpy( &at,attr,sizeof(at) ); + } + if( at.stack_size <= 0 ) + { + at.stack_size = 128 * 1024; + } + else if( at.stack_size > 1024 * 1024 * 8 ) + { + at.stack_size = 1024 * 1024 * 8; + } + + if( at.stack_size & 0xFFF ) + { + at.stack_size &= ~0xFFF; + at.stack_size += 0x1000; + } + + stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) ); + + memset( lp,0,(long)(sizeof(stCoRoutine_t))); + + + lp->env = env; + lp->pfn = pfn; + lp->arg = arg; + + stStackMem_t* stack_mem = NULL; + if( at.share_stack ) + { + stack_mem = co_get_stackmem( at.share_stack); + at.stack_size = at.share_stack->stack_size; + } + else + { + stack_mem = co_alloc_stackmem(at.stack_size); + } + lp->stack_mem = stack_mem; + + lp->ctx.ss_sp = stack_mem->stack_buffer; + lp->ctx.ss_size = at.stack_size; + + lp->cStart = 0; + lp->cEnd = 0; + lp->cIsMain = 0; + lp->cEnableSysHook = 0; + lp->cIsShareStack = at.share_stack != NULL; + + lp->save_size = 0; + lp->save_buffer = NULL; + + return lp; +} + int co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr,void *(*routine)(void*),void *arg ); + + +// microcai 添加 +template +int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,void *(*pfn)(Args...), Args... args) +{ + + if( !co_get_curr_thread_env() ) + { + co_init_curr_thread_env(); + } + + struct new_func_arg_pack { + decltype(pfn) real_func_ptr; + std::tuple args; + }; + + auto new_arg = new new_func_arg_pack { pfn, std::tuple{std::forward(args)...} }; + + auto wrapper_func = [](void* arg_pack) + { + auto converted_arg_pack =(new_func_arg_pack*)(arg_pack); + + std::unique_ptr auto_delete(converted_arg_pack); + + return std::apply(converted_arg_pack->real_func_ptr, std::move(converted_arg_pack->args)); + }; + + stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, wrapper_func, new_arg ); + *ppco = co; + return 0; +} + void co_resume( stCoRoutine_t *co ); void co_yield( stCoRoutine_t *co ); void co_yield_ct(); //ct = current thread diff --git a/example_cond.cpp b/example_cond.cpp index 5262568..fda339e 100644 --- a/example_cond.cpp +++ b/example_cond.cpp @@ -31,10 +31,9 @@ struct stEnv_t stCoCond_t* cond; queue task_queue; }; -void* Producer(void* args) +void* Producer(stEnv_t* env) { co_enable_hook_sys(); - stEnv_t* env= (stEnv_t*)args; int id = 0; while (true) { @@ -47,10 +46,9 @@ void* Producer(void* args) } return NULL; } -void* Consumer(void* args) +void* Consumer(stEnv_t* env) { co_enable_hook_sys(); - stEnv_t* env = (stEnv_t*)args; while (true) { if (env->task_queue.empty()) diff --git a/example_copystack.cpp b/example_copystack.cpp index 92062a6..fa59976 100644 --- a/example_copystack.cpp +++ b/example_copystack.cpp @@ -26,14 +26,13 @@ #include "co_routine.h" #include "co_routine_inner.h" -void* RoutineFunc(void* args) +void* RoutineFunc(int routineid) { co_enable_hook_sys(); - int* routineid = (int*)args; while (true) { char sBuff[128]; - sprintf(sBuff, "from routineid %d stack addr %p\n", *routineid, sBuff); + sprintf(sBuff, "from routineid %d stack addr %p\n", routineid, sBuff); printf("%s", sBuff); poll(NULL, 0, 1000); //sleep 1s @@ -53,7 +52,7 @@ int main() for (int i = 0; i < 2; i++) { routineid[i] = i; - co_create(&co[i], &attr, RoutineFunc, routineid + i); + co_create(&co[i], &attr, RoutineFunc, routineid[i]); co_resume(co[i]); } co_eventloop(co_get_epoll_ct(), NULL, NULL); diff --git a/example_echocli.cpp b/example_echocli.cpp index 083c1e7..13e1a1c 100644 --- a/example_echocli.cpp +++ b/example_echocli.cpp @@ -98,12 +98,11 @@ void AddFailCnt() } } -static void *readwrite_routine( void *arg ) +static void *readwrite_routine( stEndPoint endpoint ) { co_enable_hook_sys(); - stEndPoint *endpoint = (stEndPoint *)arg; char str[8]="sarlmol"; char buf[ 1024 * 16 ]; int fd = -1; @@ -114,7 +113,7 @@ static void *readwrite_routine( void *arg ) { fd = socket(PF_INET, SOCK_STREAM, 0); struct sockaddr_in addr; - SetAddr(endpoint->ip, endpoint->port, addr); + SetAddr(endpoint.ip, endpoint.port, addr); ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr)); if ( errno == EALREADY || errno == EINPROGRESS ) @@ -206,7 +205,7 @@ int main(int argc,char *argv[]) for(int i=0;i g_readwrite; -static int g_listen_fd = -1; static int SetNonBlock(int iSock) { int iFlags; @@ -61,12 +60,11 @@ static int SetNonBlock(int iSock) return ret; } -static void *readwrite_routine( void *arg ) +static void *readwrite_routine( task_t *co ) { co_enable_hook_sys(); - task_t *co = (task_t*)arg; char buf[ 1024 * 16 ]; for(;;) { @@ -104,7 +102,7 @@ static void *readwrite_routine( void *arg ) return 0; } int co_accept(int fd, struct sockaddr *addr, socklen_t *len ); -static void *accept_routine( void * ) +static void *accept_routine(int listen_fd ) { co_enable_hook_sys(); printf("accept_routine\n"); @@ -126,11 +124,11 @@ static void *accept_routine( void * ) memset( &addr,0,sizeof(addr) ); socklen_t len = sizeof(addr); - int fd = co_accept(g_listen_fd, (struct sockaddr *)&addr, &len); + int fd = co_accept(listen_fd, (struct sockaddr *)&addr, &len); if( fd < 0 ) { struct pollfd pf = { 0 }; - pf.fd = g_listen_fd; + pf.fd = listen_fd; pf.events = (POLLIN|POLLERR|POLLHUP); co_poll( co_get_epoll_ct(),&pf,1,1000 ); continue; @@ -210,15 +208,15 @@ int main(int argc,char *argv[]) int proccnt = atoi( argv[4] ); bool deamonize = argc >= 6 && strcmp(argv[5], "-d") == 0; - g_listen_fd = CreateTcpSocket( port,ip,true ); - listen( g_listen_fd,1024 ); - if(g_listen_fd==-1){ + int listen_fd = CreateTcpSocket( port,ip,true ); + listen( listen_fd,1024 ); + if(listen_fd==-1){ printf("Port %d is in use\n", port); return -1; } - printf("listen %d %s:%d\n",g_listen_fd,ip,port); + printf("listen %d %s:%d\n",listen_fd,ip,port); - SetNonBlock( g_listen_fd ); + SetNonBlock( listen_fd ); for(int k=0;k v ) { co_enable_hook_sys(); - vector &v = *(vector*)arg; for(size_t i=0;i v2 = v; - poll_routine( &v2 ); + poll_routine( v ); printf("--------------------- routine -------------------\n"); for(int i=0;i<10;i++) { stCoRoutine_t *co = 0; - vector *v2 = new vector(); - *v2 = v; - co_create( &co,NULL,poll_routine,v2 ); + co_create( &co,NULL,poll_routine, v ); printf("routine i %d\n",i); co_resume( co ); } diff --git a/example_setenv.cpp b/example_setenv.cpp index 520cfb6..6413fff 100644 --- a/example_setenv.cpp +++ b/example_setenv.cpp @@ -62,12 +62,10 @@ void SetAndGetEnv(int iRoutineID) printf("routineid %d get env CGINAME %s\n", iRoutineID, env); } -void* RoutineFunc(void* args) +void* RoutineFunc(stRoutineArgs_t* g) { co_enable_hook_sys(); - stRoutineArgs_t* g = (stRoutineArgs_t*)args; - SetAndGetEnv(g->iRoutineID); return NULL; } diff --git a/example_specific.cpp b/example_specific.cpp index 5d20a8b..b5f5e37 100644 --- a/example_specific.cpp +++ b/example_specific.cpp @@ -35,10 +35,9 @@ struct stRoutineSpecificData_t CO_ROUTINE_SPECIFIC(stRoutineSpecificData_t, __routine); -void* RoutineFunc(void* args) +void* RoutineFunc(stRoutineArgs_t* routine_args) { co_enable_hook_sys(); - stRoutineArgs_t* routine_args = (stRoutineArgs_t*)args; __routine->idx = routine_args->routine_id; while (true) { @@ -53,7 +52,7 @@ int main() for (int i = 0; i < 10; i++) { args[i].routine_id = i; - co_create(&args[i].co, NULL, RoutineFunc, (void*)&args[i]); + co_create(&args[i].co, NULL, RoutineFunc, &args[i]); co_resume(args[i].co); } co_eventloop(co_get_epoll_ct(), NULL, NULL);