From e94c699938f320086bc8b77e69131bfa4add0896 Mon Sep 17 00:00:00 2001 From: Andy Li Date: Wed, 6 Dec 2023 09:15:48 +0000 Subject: [PATCH 01/15] initial checkin --- src/collectives/all_reduce.cc | 2 +- src/collectives/device/prims_ll.h | 3 ++ src/collectives/device/prims_ll128.h | 3 ++ src/collectives/device/prims_simple.h | 3 ++ src/include/msccl/msccl_scheduler.h | 3 +- src/misc/msccl/msccl_lifecycle.cc | 34 +++++++++----- src/proxy.cc | 68 ++++++++++++++++++++++++++- src/transport/net_ib.cc | 10 +++- 8 files changed, 110 insertions(+), 16 deletions(-) diff --git a/src/collectives/all_reduce.cc b/src/collectives/all_reduce.cc index 365c73c..ed27152 100644 --- a/src/collectives/all_reduce.cc +++ b/src/collectives/all_reduce.cc @@ -26,7 +26,7 @@ ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count, }; NvtxParamsAllReduce payload{count * ncclTypeSize(datatype), op}; NVTX3_FUNC_WITH_PARAMS(AllReduce, AllReduceSchema, payload) - + INFO(NCCL_INIT, "MSCCL: ncclAllReduce Entered"); if (mscclAvailable() && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, diff --git a/src/collectives/device/prims_ll.h b/src/collectives/device/prims_ll.h index 3c4355f..acc2c87 100644 --- a/src/collectives/device/prims_ll.h +++ b/src/collectives/device/prims_ll.h @@ -8,6 +8,8 @@ #include "npkit/npkit.h" #endif +#include + template class Primitives: public PrimitivesWithoutDirect> { @@ -75,6 +77,7 @@ class Primitives: inline __device__ int checkAbort(int &spins, int send) { spins++; if (abort == 0 && spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { + printf("checkAbort LL, abortFlag:%u \n", *ncclShmem.comm.abortFlag); abort = *ncclShmem.comm.abortFlag; spins = 0; } diff --git a/src/collectives/device/prims_ll128.h b/src/collectives/device/prims_ll128.h index 6b906ce..afc9c3e 100644 --- a/src/collectives/device/prims_ll128.h +++ b/src/collectives/device/prims_ll128.h @@ -11,6 +11,8 @@ #include "npkit/npkit.h" #endif +#include + #define NCCL_LL128_FLAGTHREAD (NCCL_LL128_LINEELEMS-1) template @@ -79,6 +81,7 @@ class Primitives: inline __device__ int checkAbort(int &spins, int i, int send) { spins++; if (abort == 0 && spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { + printf("checkAbort LL128, abortFlag:%u \n", *ncclShmem.comm.abortFlag); abort = *ncclShmem.comm.abortFlag; spins = 0; } diff --git a/src/collectives/device/prims_simple.h b/src/collectives/device/prims_simple.h index b39dcb6..7cd99c0 100644 --- a/src/collectives/device/prims_simple.h +++ b/src/collectives/device/prims_simple.h @@ -11,6 +11,8 @@ #include "npkit/npkit.h" #endif +#include + template class Primitives< @@ -111,6 +113,7 @@ class Primitives< inline __device__ bool checkAbort(int &spins) { spins++; if (!(flags & Aborted) && spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { + printf("checkAbort Simple, abortFlag:%u \n", *ncclShmem.comm.abortFlag); if (*ncclShmem.comm.abortFlag) { flags |= Aborted; ncclShmem.aborted = 1; diff --git a/src/include/msccl/msccl_scheduler.h b/src/include/msccl/msccl_scheduler.h index 1dd9f6b..4b881ef 100644 --- a/src/include/msccl/msccl_scheduler.h +++ b/src/include/msccl/msccl_scheduler.h @@ -36,13 +36,14 @@ struct mscclSchedulerParam { int nRanks; bool scheduled; mscclAlgoHandle_t handle; + bool repair; }; typedef struct { // Name of the scheduler (mainly for logs) const char* name; // Load all algorithms - ncclResult_t (*init)(); + ncclResult_t (*init)(ncclComm_t comm); // Select an algorithm ncclResult_t (*selectAlgo)(struct mscclSchedulerParam* param); // Unload all algorithms diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index 9ec9695..65f27cc 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -26,6 +26,7 @@ NCCL_PARAM(MscclEnabled, "MSCCL_ENABLE", 1); static std::atomic mscclInitialized; static bool mscclSchedulerTriedLoadAlgo = false; static std::mutex mscclLifecycleMutex; +extern int nicfailure; int getEnvInt(const char* env, int64_t deftVal) { char* str = getenv(env); @@ -154,7 +155,7 @@ static ncclResult_t mscclInternalSchedulerInit() { return ncclSuccess; } -static ncclResult_t mscclSchedulerInit() { +static ncclResult_t mscclSchedulerInit(ncclComm_t comm) { mscclStatus& status = mscclGetStatus(); bool useInternalScheduler = false; @@ -177,7 +178,7 @@ static ncclResult_t mscclSchedulerInit() { if (useInternalScheduler) { NCCLCHECK(mscclInternalSchedulerInit()); } else { - NCCLCHECK(status.mscclSchedulerPtr->init()); + NCCLCHECK(status.mscclSchedulerPtr->init(comm)); } return ncclSuccess; } @@ -217,7 +218,7 @@ ncclResult_t mscclInit(ncclComm_t comm) { status.needsProxy = false; mscclSchedulerTriedLoadAlgo = false; - NCCLCHECK(mscclSchedulerInit()); + NCCLCHECK(mscclSchedulerInit(comm)); mscclInitialized.store(true, std::memory_order_release); } @@ -312,7 +313,7 @@ static ncclResult_t mscclSetSavedSchedulerParam( const void* sendBuff, const size_t sendCounts[], const size_t sDisPls[], void* recvBuff, const size_t recvCounts[], const size_t rDisPls[], size_t count, ncclDataType_t dataType, int root, int peer, ncclRedOp_t op, - mscclFunc_t func, ncclComm_t comm, cudaStream_t stream, + mscclFunc_t func, ncclComm_t comm, cudaStream_t stream, bool repair, struct mscclSavedSchedulerParam* param) { param->p.sendBuff = sendBuff; param->p.sendCounts = sendCounts; @@ -328,6 +329,7 @@ static ncclResult_t mscclSetSavedSchedulerParam( param->p.func = func; param->p.rank = comm->rank; param->p.nRanks = comm->nRanks; + param->p.repair = repair; param->comm = comm; param->stream = stream; return ncclSuccess; @@ -413,27 +415,37 @@ ncclResult_t mscclEnqueueCheck( void* recvBuff, const size_t recvCounts[], const size_t rDisPls[], size_t count, ncclDataType_t dataType, int root, int peer, ncclRedOp_t op, mscclFunc_t func, ncclComm_t comm, cudaStream_t stream) { + INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck mscclNoGroup"); mscclThreadLocalStatus& threadLocalStatus = mscclGetThreadLocalStatus(); + bool repair = false; + + if (*comm->abortFlag) + { + *comm->abortFlag=0; + repair = true; + } threadLocalStatus.savedSchedulerParams.push_back({}); NCCLCHECK(mscclSetSavedSchedulerParam( sendBuff, sendCounts, sDisPls, recvBuff, recvCounts, rDisPls, - count, dataType, root, peer, op, func, comm, stream, + count, dataType, root, peer, op, func, comm, stream, repair, &threadLocalStatus.savedSchedulerParams.back())); switch (threadLocalStatus.groupStatus) { case mscclNoGroup: - if (comm->mscclCompatible) { - NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); - if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { - NCCLCHECK(mscclRunSavedParams()); - break; - } + if (comm->mscclCompatible) { + INFO(NCCL_INIT, "MSCCL: mscclEnqueueCheck mscclNoGroup, com abort flag: %d, nic failure check: %d", *comm->abortFlag, nicfailure); + NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); + if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { + NCCLCHECK(mscclRunSavedParams()); + break; + } } NCCLCHECK(mscclFallBackSavedParams()); break; case mscclGroupSupportedOp: if (comm->mscclCompatible) { + INFO(NCCL_INIT, "MSCCL: mscclEnqueueCheck mscclGroupSupportedOp, com abort flag: %d, nic failure check: %d", *comm->abortFlag, nicfailure); NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { // Only save counts and displs when there is suitable MSCCL algorithm for this diff --git a/src/proxy.cc b/src/proxy.cc index c16ebd2..b2d6e99 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -8,6 +8,7 @@ #include "comm.h" #include "info.h" #include "collectives.h" +#include "bootstrap.h" #include "socket.h" #include "shm.h" #include "profiler.h" @@ -860,6 +861,7 @@ void* ncclProxyProgress(void *proxyState_) { ncclResult_t ret = progressOps(proxyState, state, state->active, &idle); if (ret != ncclSuccess) { INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); + //*proxyState->abortFlag = 1; return NULL; } if (lastIdle == 0 && idle == 1) ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileIdle); @@ -1102,6 +1104,8 @@ ncclResult_t ncclProxyCallAsync(struct ncclComm* comm, struct ncclProxyConnector struct ncclSocket* sock; ncclResult_t ret = ncclSuccess; struct ncclProxyState* sharedProxyState = comm->proxyState; + + WARN("ncclProxyCallAsync() called"); if (sharedProxyState->peerSocks == NULL) return ncclInternalError; @@ -1383,7 +1387,7 @@ static ncclResult_t proxyServiceInitOp(int type, struct ncclProxyLocalPeer* peer } #include - +extern int nicfailure; static bool proxyMatchOpType(int type) { switch (type) { case ncclProxyMsgInit: @@ -1430,11 +1434,27 @@ void* ncclProxyService(void* _args) { int npeers = 0; int stop = 0; int asyncOpCount = 0; + int nloopcount = 0; while (stop == 0 || (stop == 1 && npeers > 0)) { /* Even if local comm aborts, we cannot let proxy thread exit if we still have peer * connections. Need to wait until all other related comms call abort and safely exit * together, or we could face segmentation fault. */ - if (*proxyState->abortFlag != 0) stop = 1; + nloopcount++; + // if (nicfailure) + // { + // *proxyState->abortFlag = 1; + // WARN("[Proxy Service] detected nic failure, change abort flag to 1"); + // } + // else + // { + // WARN("[Proxy Service] not able to detected the nic failure"); + // } + WARN("[Proxy Service] still in service loop %d", nloopcount); + if (*proxyState->abortFlag != 0) + { + stop = 1; + WARN("[Proxy Service] Received abort flag!"); + } /* never let proxy service thread blocks in poll, or it cannot receive abortFlag. */ int ret; do { @@ -1538,6 +1558,7 @@ void* ncclProxyService(void* _args) { } // Wait for all operations to complete and stop progress thread before freeing any resource + WARN("[Proxy Service] will destory soon!"); if (ncclProxyProgressDestroy(proxyState) != ncclSuccess) { WARN("[Proxy Service] proxyDestroy failed"); } @@ -1551,6 +1572,47 @@ void* ncclProxyService(void* _args) { return NULL; } +void* ncclProxyServiceDaemon(void* _args) { + ncclComm *comm = (ncclComm*)_args; + WARN("[Proxy Service] start the ncclProxyServiceDaemon now, ranks:%d, rank:%d", comm->nRanks, comm->rank); + int* status = NULL; + status = new int[comm->nRanks]; + for (int i = 0; i < comm->nRanks; ++i) { + status[i] = 0; + } + + while(1) + { + status[comm->rank] = nicfailure; + if (nicfailure) + { + WARN("[Proxy Service] ncclProxyServiceDaemon, rank: %d detect the nic failure, will start to use allgather to notify others: %d", comm->rank, nicfailure); + } + bootstrapAllGather(comm->bootstrap, status, sizeof(int)); + int all_status = 0; + for (int i = 0; i < comm->nRanks; ++i) { + all_status |= status[i]; + } + + WARN("[Proxy Service] ncclProxyServiceDaemon, finished allgather, all_status: %d", all_status); + + if (all_status != 0) + { + WARN("[Proxy Service] ncclProxyServiceDaemon, detect the nic failure, will step the proxy service now"); + *comm->abortFlag=1; + ncclProxyStop(comm); + break; + } + else + { + WARN("[Proxy Service] ncclProxyServiceDaemon, not detect the nic failure, will check it again later"); + sleep(5); + } + } + WARN("[Proxy Service] will quit the ncclProxyServiceDaemon now"); + return NULL; +} + ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union ncclSocketAddress* peerAddresses) { assert(comm->sharedRes->proxyState == NULL); NCCLCHECK(ncclCalloc(&comm->sharedRes->proxyState, 1)); @@ -1581,8 +1643,10 @@ ncclResult_t ncclProxyCreate(struct ncclComm* comm) { proxyState->ncclCollNet = comm->ncclCollNet; memcpy(proxyState->buffSizes, comm->buffSizes, sizeof(comm->buffSizes)); + pthread_t thread; pthread_create(&comm->proxyState->thread, NULL, ncclProxyService, comm->proxyState); ncclSetThreadName(comm->proxyState->thread, "NCCL Service %2d", comm->cudaDev); + pthread_create(&thread, NULL, ncclProxyServiceDaemon, comm); } return ncclSuccess; } diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index 861fa57..fb2ea34 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -28,6 +28,7 @@ #define MAXNAMESIZE 64 static char ncclIbIfName[MAX_IF_NAME_SIZE+1]; static union ncclSocketAddress ncclIbIfAddr; +int nicfailure = 0; struct ncclIbMr { uintptr_t addr; @@ -92,7 +93,14 @@ static void* ncclIbAsyncThreadMain(void* args) { char *str; if (ncclSuccess != wrap_ibv_event_type_str(&str, event.event_type)) { break; } if (event.event_type != IBV_EVENT_COMM_EST) - WARN("NET/IB : Got async event : %s", str); + { + WARN("NET/IB : Got async event : %s, event type: %d", str, event.event_type); + if (strcmp(str, "local catastrophic error") == 0) { + WARN("NET/IB : Detect Nic failure, will repaire soon, event type: %d", event.event_type); + nicfailure = 1; + break; + } + } if (ncclSuccess != wrap_ibv_ack_async_event(&event)) { break; } } return NULL; From e04f7f8efd5d7779add22ac29424e5409fff160e Mon Sep 17 00:00:00 2001 From: Andy Li Date: Fri, 8 Dec 2023 16:25:42 +0000 Subject: [PATCH 02/15] expose bootstrap api --- src/bootstrap.cc | 9 +++++++++ src/collectives/all_reduce.cc | 1 - src/include/bootstrap.h | 11 +++++++++++ src/misc/msccl/msccl_lifecycle.cc | 4 +--- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/bootstrap.cc b/src/bootstrap.cc index 764cb6c..4e831d1 100644 --- a/src/bootstrap.cc +++ b/src/bootstrap.cc @@ -13,6 +13,8 @@ #include #include "proxy.h" +#define NCCL_BOOTSTRAP_NAME "github.com/Azure/msccl-executor/bootstrap" + struct bootstrapRootArgs { struct ncclSocket* listenSock; uint64_t magic; @@ -591,3 +593,10 @@ ncclResult_t bootstrapAbort(void* commState) { free(state); return ncclSuccess; } + +__attribute__((visibility("default"))) ncclBootstrapInterface ncclBootstrap = { + .name = NCCL_BOOTSTRAP_NAME, + .send = bootstrapSend, + .receive = bootstrapRecv, + .allgather = bootstrapAllGather, +}; diff --git a/src/collectives/all_reduce.cc b/src/collectives/all_reduce.cc index ed27152..a4c8f96 100644 --- a/src/collectives/all_reduce.cc +++ b/src/collectives/all_reduce.cc @@ -26,7 +26,6 @@ ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count, }; NvtxParamsAllReduce payload{count * ncclTypeSize(datatype), op}; NVTX3_FUNC_WITH_PARAMS(AllReduce, AllReduceSchema, payload) - INFO(NCCL_INIT, "MSCCL: ncclAllReduce Entered"); if (mscclAvailable() && !mscclIsCaller()) { return mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, diff --git a/src/include/bootstrap.h b/src/include/bootstrap.h index 400a479..54f7375 100644 --- a/src/include/bootstrap.h +++ b/src/include/bootstrap.h @@ -29,4 +29,15 @@ ncclResult_t bootstrapIntraNodeAllGather(void* commState, int *ranks, int rank, ncclResult_t bootstrapIntraNodeBroadcast(void* commState, int *ranks, int rank, int nranks, int root, void* bcastData, int size); ncclResult_t bootstrapClose(void* commState); ncclResult_t bootstrapAbort(void* commState); + +typedef struct { + // Name of the Bootstrap (mainly for logs) + const char* name; + // send operation + ncclResult_t (*send)(void* commState, int peer, int tag, void* data, int size); + // receive operation + ncclResult_t (*receive)(void* commState, int peer, int tag, void* data, int size); + // allgather operation + ncclResult_t (*allgather)(void* commState, void* allData, int size); +} ncclBootstrapInterface; #endif diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index 65f27cc..bb24296 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -415,7 +415,7 @@ ncclResult_t mscclEnqueueCheck( void* recvBuff, const size_t recvCounts[], const size_t rDisPls[], size_t count, ncclDataType_t dataType, int root, int peer, ncclRedOp_t op, mscclFunc_t func, ncclComm_t comm, cudaStream_t stream) { - INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck mscclNoGroup"); + INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck mscclNoGroup com abort flag: %d, nic failure check: %d", *comm->abortFlag, nicfailure); mscclThreadLocalStatus& threadLocalStatus = mscclGetThreadLocalStatus(); bool repair = false; @@ -434,7 +434,6 @@ ncclResult_t mscclEnqueueCheck( switch (threadLocalStatus.groupStatus) { case mscclNoGroup: if (comm->mscclCompatible) { - INFO(NCCL_INIT, "MSCCL: mscclEnqueueCheck mscclNoGroup, com abort flag: %d, nic failure check: %d", *comm->abortFlag, nicfailure); NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { NCCLCHECK(mscclRunSavedParams()); @@ -445,7 +444,6 @@ ncclResult_t mscclEnqueueCheck( break; case mscclGroupSupportedOp: if (comm->mscclCompatible) { - INFO(NCCL_INIT, "MSCCL: mscclEnqueueCheck mscclGroupSupportedOp, com abort flag: %d, nic failure check: %d", *comm->abortFlag, nicfailure); NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { // Only save counts and displs when there is suitable MSCCL algorithm for this From bcd4090da253e8a3b9e56021012c52aa74030065 Mon Sep 17 00:00:00 2001 From: Andy Li Date: Mon, 11 Dec 2023 10:36:50 +0000 Subject: [PATCH 03/15] clean up code --- src/include/msccl/msccl_scheduler.h | 1 + src/include/nccl_net.h | 4 ++++ src/misc/msccl/msccl_lifecycle.cc | 21 +++++++++++++++---- src/proxy.cc | 31 +++++++---------------------- src/transport/net_ib.cc | 18 ++++++++++++++--- src/transport/net_socket.cc | 10 ++++++++++ 6 files changed, 54 insertions(+), 31 deletions(-) diff --git a/src/include/msccl/msccl_scheduler.h b/src/include/msccl/msccl_scheduler.h index 4b881ef..5a9b74c 100644 --- a/src/include/msccl/msccl_scheduler.h +++ b/src/include/msccl/msccl_scheduler.h @@ -37,6 +37,7 @@ struct mscclSchedulerParam { bool scheduled; mscclAlgoHandle_t handle; bool repair; + ncclComm_t comm; }; typedef struct { diff --git a/src/include/nccl_net.h b/src/include/nccl_net.h index a387e66..387b52b 100644 --- a/src/include/nccl_net.h +++ b/src/include/nccl_net.h @@ -48,6 +48,10 @@ typedef struct { ncclResult_t (*devices)(int* ndev); // Get various device properties. ncclResult_t (*getProperties)(int dev, ncclNetProperties_v6_t* props); + // Get device status. + ncclResult_t (*getStatus)(int* nstat); + // Get various device properties. + ncclResult_t (*setStatus)(int nstat); // Create a receiving object and provide a handle to connect to it. The // handle can be up to NCCL_NET_HANDLE_MAXSIZE bytes and will be exchanged // between ranks to create a connection. diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index bb24296..7a543c4 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -26,7 +26,7 @@ NCCL_PARAM(MscclEnabled, "MSCCL_ENABLE", 1); static std::atomic mscclInitialized; static bool mscclSchedulerTriedLoadAlgo = false; static std::mutex mscclLifecycleMutex; -extern int nicfailure; +extern ncclNet_t ncclNetIb; int getEnvInt(const char* env, int64_t deftVal) { char* str = getenv(env); @@ -330,6 +330,7 @@ static ncclResult_t mscclSetSavedSchedulerParam( param->p.rank = comm->rank; param->p.nRanks = comm->nRanks; param->p.repair = repair; + param->p.comm = comm; param->comm = comm; param->stream = stream; return ncclSuccess; @@ -415,14 +416,21 @@ ncclResult_t mscclEnqueueCheck( void* recvBuff, const size_t recvCounts[], const size_t rDisPls[], size_t count, ncclDataType_t dataType, int root, int peer, ncclRedOp_t op, mscclFunc_t func, ncclComm_t comm, cudaStream_t stream) { - INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck mscclNoGroup com abort flag: %d, nic failure check: %d", *comm->abortFlag, nicfailure); + int nicStat = 0; + ncclNetIb.getStatus(&nicStat); + INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck mscclNoGroup com abort flag: %d, nic failure: %d", *comm->abortFlag, nicStat); mscclThreadLocalStatus& threadLocalStatus = mscclGetThreadLocalStatus(); bool repair = false; if (*comm->abortFlag) { - *comm->abortFlag=0; - repair = true; + int nicStat = 0; + ncclNetIb.getStatus(&nicStat); + if (nicStat) + { + *comm->abortFlag=0; + repair = true; + } } threadLocalStatus.savedSchedulerParams.push_back({}); @@ -459,6 +467,11 @@ ncclResult_t mscclEnqueueCheck( default: return ncclInvalidUsage; } + if(repair) + { + repair = false; + ncclNetIb.setStatus(0); + } return ncclSuccess; } diff --git a/src/proxy.cc b/src/proxy.cc index b2d6e99..6d5f340 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -18,6 +18,8 @@ #include #include +extern ncclNet_t ncclNetIb; + static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) { if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice) return true; @@ -1104,8 +1106,6 @@ ncclResult_t ncclProxyCallAsync(struct ncclComm* comm, struct ncclProxyConnector struct ncclSocket* sock; ncclResult_t ret = ncclSuccess; struct ncclProxyState* sharedProxyState = comm->proxyState; - - WARN("ncclProxyCallAsync() called"); if (sharedProxyState->peerSocks == NULL) return ncclInternalError; @@ -1434,27 +1434,11 @@ void* ncclProxyService(void* _args) { int npeers = 0; int stop = 0; int asyncOpCount = 0; - int nloopcount = 0; while (stop == 0 || (stop == 1 && npeers > 0)) { /* Even if local comm aborts, we cannot let proxy thread exit if we still have peer * connections. Need to wait until all other related comms call abort and safely exit * together, or we could face segmentation fault. */ - nloopcount++; - // if (nicfailure) - // { - // *proxyState->abortFlag = 1; - // WARN("[Proxy Service] detected nic failure, change abort flag to 1"); - // } - // else - // { - // WARN("[Proxy Service] not able to detected the nic failure"); - // } - WARN("[Proxy Service] still in service loop %d", nloopcount); - if (*proxyState->abortFlag != 0) - { - stop = 1; - WARN("[Proxy Service] Received abort flag!"); - } + if (*proxyState->abortFlag != 0) stop = 1; /* never let proxy service thread blocks in poll, or it cannot receive abortFlag. */ int ret; do { @@ -1558,7 +1542,6 @@ void* ncclProxyService(void* _args) { } // Wait for all operations to complete and stop progress thread before freeing any resource - WARN("[Proxy Service] will destory soon!"); if (ncclProxyProgressDestroy(proxyState) != ncclSuccess) { WARN("[Proxy Service] proxyDestroy failed"); } @@ -1583,10 +1566,11 @@ void* ncclProxyServiceDaemon(void* _args) { while(1) { - status[comm->rank] = nicfailure; - if (nicfailure) + int nicStat = 0; + status[comm->rank] = ncclNetIb.getStatus(&nicStat); + if (nicStat) { - WARN("[Proxy Service] ncclProxyServiceDaemon, rank: %d detect the nic failure, will start to use allgather to notify others: %d", comm->rank, nicfailure); + WARN("[Proxy Service] ncclProxyServiceDaemon, rank: %d detect the nic failure, will start to use allgather to notify others: %d", comm->rank, nicStat); } bootstrapAllGather(comm->bootstrap, status, sizeof(int)); int all_status = 0; @@ -1600,7 +1584,6 @@ void* ncclProxyServiceDaemon(void* _args) { { WARN("[Proxy Service] ncclProxyServiceDaemon, detect the nic failure, will step the proxy service now"); *comm->abortFlag=1; - ncclProxyStop(comm); break; } else diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index fb2ea34..a805c58 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -28,7 +28,7 @@ #define MAXNAMESIZE 64 static char ncclIbIfName[MAX_IF_NAME_SIZE+1]; static union ncclSocketAddress ncclIbIfAddr; -int nicfailure = 0; +int nicStatus = 0; struct ncclIbMr { uintptr_t addr; @@ -96,8 +96,8 @@ static void* ncclIbAsyncThreadMain(void* args) { { WARN("NET/IB : Got async event : %s, event type: %d", str, event.event_type); if (strcmp(str, "local catastrophic error") == 0) { - WARN("NET/IB : Detect Nic failure, will repaire soon, event type: %d", event.event_type); - nicfailure = 1; + WARN("NET/IB : Detect Nic failure, will repair soon, event type: %d", event.event_type); + nicStatus = 1; break; } } @@ -344,6 +344,16 @@ ncclResult_t ncclIbGetProperties(int dev, ncclNetProperties_t* props) { return ncclSuccess; } +ncclResult_t ncclIbGetStatus(int* nstat) { + *nstat = nicStatus; + return ncclSuccess; +} + +ncclResult_t ncclIbSetStatus(int nstat) { + nicStatus = nstat; + return ncclSuccess; +} + // We need to support NCCL_NET_MAX_REQUESTS for each concurrent receive #define MAX_REQUESTS (NCCL_NET_MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS) static_assert(MAX_REQUESTS <= 256, "request id are encoded in wr_id and we need up to 8 requests ids per completion"); @@ -1376,6 +1386,8 @@ ncclNet_t ncclNetIb = { ncclIbInit, ncclIbDevices, ncclIbGetProperties, + ncclIbGetStatus, + ncclIbSetStatus, ncclIbListen, ncclIbConnect, ncclIbAccept, diff --git a/src/transport/net_socket.cc b/src/transport/net_socket.cc index 08a8c3a..801d194 100644 --- a/src/transport/net_socket.cc +++ b/src/transport/net_socket.cc @@ -104,6 +104,14 @@ ncclResult_t ncclNetSocketGetProperties(int dev, ncclNetProperties_t* props) { return ncclSuccess; } +ncclResult_t ncclNetSocketGetStatus(int* nstat){ + return ncclSuccess; +} +ncclResult_t ncclNetSocketSetStatus(int nstat){ + return ncclSuccess; +} + + /* Communication functions */ #define MAX_SOCKETS 64 @@ -597,6 +605,8 @@ ncclNet_t ncclNetSocket = { ncclNetSocketInit, ncclNetSocketDevices, ncclNetSocketGetProperties, + ncclNetSocketGetStatus, + ncclNetSocketSetStatus, ncclNetSocketListen, ncclNetSocketConnect, ncclNetSocketAccept, From e1bc9b0f984bb6e0136a370338b2d6f22d96af67 Mon Sep 17 00:00:00 2001 From: Andy Li Date: Tue, 12 Dec 2023 08:10:01 +0000 Subject: [PATCH 04/15] fix the status fetch regression bug --- src/proxy.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/proxy.cc b/src/proxy.cc index 6d5f340..6052e16 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1567,12 +1567,13 @@ void* ncclProxyServiceDaemon(void* _args) { while(1) { int nicStat = 0; - status[comm->rank] = ncclNetIb.getStatus(&nicStat); + ncclNetIb.getStatus(&nicStat); + status[comm->rank] = nicStat; if (nicStat) { WARN("[Proxy Service] ncclProxyServiceDaemon, rank: %d detect the nic failure, will start to use allgather to notify others: %d", comm->rank, nicStat); } - bootstrapAllGather(comm->bootstrap, status, sizeof(int)); + bootstrapAllGather(comm->bootstrap, status, sizeof(int) * comm->nRanks); int all_status = 0; for (int i = 0; i < comm->nRanks; ++i) { all_status |= status[i]; From 0bcafb0d0a883c4df15a3c7353ac1fa48f4c4590 Mon Sep 17 00:00:00 2001 From: Andy Li Date: Wed, 13 Dec 2023 16:49:47 +0000 Subject: [PATCH 05/15] fix according to code review comments --- src/bootstrap.cc | 7 ----- src/collectives/all_gather.cc | 20 ++++++++++---- src/collectives/all_reduce.cc | 20 ++++++++++---- src/collectives/all_to_all.cc | 35 +++++++++++++++--------- src/collectives/broadcast.cc | 15 +++++++++-- src/collectives/reduce.cc | 18 ++++++++++--- src/collectives/reduce_scatter.cc | 18 ++++++++++--- src/collectives/sendrecv.cc | 41 +++++++++++++++++++---------- src/include/bootstrap.h | 10 ------- src/include/msccl/msccl_lifecycle.h | 2 ++ src/include/msccl/msccl_scheduler.h | 15 ++++++++++- src/misc/msccl/msccl_lifecycle.cc | 28 +++++++++++++++++--- src/proxy.cc | 39 +++++++++++++++------------ src/transport/net_ib.cc | 2 +- 14 files changed, 184 insertions(+), 86 deletions(-) diff --git a/src/bootstrap.cc b/src/bootstrap.cc index 4e831d1..a88e71b 100644 --- a/src/bootstrap.cc +++ b/src/bootstrap.cc @@ -593,10 +593,3 @@ ncclResult_t bootstrapAbort(void* commState) { free(state); return ncclSuccess; } - -__attribute__((visibility("default"))) ncclBootstrapInterface ncclBootstrap = { - .name = NCCL_BOOTSTRAP_NAME, - .send = bootstrapSend, - .receive = bootstrapRecv, - .allgather = bootstrapAllGather, -}; diff --git a/src/collectives/all_gather.cc b/src/collectives/all_gather.cc index 767ab49..b9a5700 100644 --- a/src/collectives/all_gather.cc +++ b/src/collectives/all_gather.cc @@ -7,9 +7,12 @@ #include "enqueue.h" #include "collectives.h" +#include "param.h" #include "msccl/msccl_lifecycle.h" +extern int64_t ncclParamResilientEnabled(); + NCCL_API(ncclResult_t, ncclAllGather, const void* sendbuff, void* recvbuff, size_t sendcount, ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream); ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount, @@ -21,14 +24,21 @@ ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcoun size_t msgsize = sendcount * ncclTypeSize(datatype); NVTX3_FUNC_WITH_PARAMS(AllGather, AllGatherSchema, msgsize) + ncclResult_t ret; if (mscclAvailable() && !mscclIsCaller()) { - return mscclEnqueueCheck( + ret = mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, sendcount, datatype, 0, 0, ncclSum, mscclFuncAllGather, comm, stream); } + else{ + struct ncclInfo info = { ncclFuncAllGather, "AllGather", + sendbuff, recvbuff, sendcount, datatype, ncclSum, 0, comm, stream, /* Args */ + ALLGATHER_CHUNKSTEPS, ALLGATHER_SLICESTEPS }; + ret = ncclEnqueueCheck(&info); + } + if (ncclParamResilientEnabled()){ + cudaStreamSynchronize(stream); + } - struct ncclInfo info = { ncclFuncAllGather, "AllGather", - sendbuff, recvbuff, sendcount, datatype, ncclSum, 0, comm, stream, /* Args */ - ALLGATHER_CHUNKSTEPS, ALLGATHER_SLICESTEPS }; - return ncclEnqueueCheck(&info); + return ret; } diff --git a/src/collectives/all_reduce.cc b/src/collectives/all_reduce.cc index a4c8f96..2dc3106 100644 --- a/src/collectives/all_reduce.cc +++ b/src/collectives/all_reduce.cc @@ -7,8 +7,10 @@ #include "enqueue.h" #include "nccl.h" +#include "param.h" #include "msccl/msccl_lifecycle.h" +extern int64_t ncclParamResilientEnabled(); NCCL_API(ncclResult_t, ncclAllReduce, const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclRedOp_t op, ncclComm* comm, cudaStream_t stream); @@ -26,14 +28,22 @@ ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count, }; NvtxParamsAllReduce payload{count * ncclTypeSize(datatype), op}; NVTX3_FUNC_WITH_PARAMS(AllReduce, AllReduceSchema, payload) + + ncclResult_t ret; if (mscclAvailable() && !mscclIsCaller()) { - return mscclEnqueueCheck( + ret = mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, count, datatype, 0, 0, op, mscclFuncAllReduce, comm, stream); } + else{ + struct ncclInfo info = { ncclFuncAllReduce, "AllReduce", + sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */ + ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS }; + ret = ncclEnqueueCheck(&info); + } + if (ncclParamResilientEnabled()){ + cudaStreamSynchronize(stream); + } - struct ncclInfo info = { ncclFuncAllReduce, "AllReduce", - sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */ - ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS }; - return ncclEnqueueCheck(&info); + return ret; } diff --git a/src/collectives/all_to_all.cc b/src/collectives/all_to_all.cc index a1bc655..81a8913 100644 --- a/src/collectives/all_to_all.cc +++ b/src/collectives/all_to_all.cc @@ -8,28 +8,39 @@ #include "enqueue.h" #include "collectives.h" #include "graph/topo.h" +#include "param.h" #include "msccl/msccl_lifecycle.h" +extern int64_t ncclParamResilientEnabled(); + NCCL_API(ncclResult_t, ncclAllToAll, const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream); ncclResult_t ncclAllToAll(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream) { + ncclResult_t ret; + if (mscclAvailable() && !mscclIsCaller()) { - return mscclEnqueueCheck( + ret = mscclEnqueueCheck( sendbuff, nullptr, nullptr, recvbuff, nullptr, nullptr, count, datatype, 0, 0, ncclSum, mscclFuncAllToAll, comm, stream); } - - size_t rankOffset = count * ncclTypeSize(datatype); - int nRanks; - NCCLCHECK(ncclCommCount(comm, &nRanks)); - if (count == 0) return ncclSuccess; - NCCLCHECK(ncclGroupStart()); - for (int r=0; r #include "alloc.h" +#include "bootstrap.h" #include "checks.h" #include "graph/topo.h" @@ -21,12 +22,17 @@ #include "msccl/msccl_parser.h" #include "msccl/msccl_setup.h" #include "msccl/msccl_status.h" +#include "msccl/msccl_scheduler.h" NCCL_PARAM(MscclEnabled, "MSCCL_ENABLE", 1); static std::atomic mscclInitialized; static bool mscclSchedulerTriedLoadAlgo = false; static std::mutex mscclLifecycleMutex; +extern ncclResult_t bootstrapAllGather(void* commState, void* allData, int size); +extern ncclResult_t bootstrapSend(void* commState, int peer, int tag, void* data, int size); +extern ncclResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size); extern ncclNet_t ncclNetIb; +extern int64_t ncclParamResilientEnabled(); int getEnvInt(const char* env, int64_t deftVal) { char* str = getenv(env); @@ -58,6 +64,11 @@ bool mscclAvailable() { return mscclEnabled() && mscclInitialized.load(std::memory_order_acquire); } +mscclSchedulerInitParam& mscclGetSchedulerInitParam() { + static mscclSchedulerInitParam initParam; + return initParam; +} + static bool mscclCommCompatible(ncclComm_t comm) { std::map> hostHashToPidHashes; for (int i = 0; i < comm->nRanks; i++) { @@ -155,7 +166,7 @@ static ncclResult_t mscclInternalSchedulerInit() { return ncclSuccess; } -static ncclResult_t mscclSchedulerInit(ncclComm_t comm) { +static ncclResult_t mscclSchedulerInit(mscclSchedulerInitParam *initParam) { mscclStatus& status = mscclGetStatus(); bool useInternalScheduler = false; @@ -178,7 +189,7 @@ static ncclResult_t mscclSchedulerInit(ncclComm_t comm) { if (useInternalScheduler) { NCCLCHECK(mscclInternalSchedulerInit()); } else { - NCCLCHECK(status.mscclSchedulerPtr->init(comm)); + NCCLCHECK(status.mscclSchedulerPtr->init(initParam)); } return ncclSuccess; } @@ -218,7 +229,16 @@ ncclResult_t mscclInit(ncclComm_t comm) { status.needsProxy = false; mscclSchedulerTriedLoadAlgo = false; - NCCLCHECK(mscclSchedulerInit(comm)); + mscclSchedulerInitParam initParam = mscclGetSchedulerInitParam(); + initParam.nRanks = comm->nRanks; + initParam.rank = comm->rank; + initParam.nNodes = comm->nNodes; + initParam.bootstrap = comm->bootstrap; + initParam.send = bootstrapSend; + initParam.receive = bootstrapRecv; + initParam.allgather = bootstrapAllGather; + + NCCLCHECK(mscclSchedulerInit(&initParam)); mscclInitialized.store(true, std::memory_order_release); } @@ -422,7 +442,7 @@ ncclResult_t mscclEnqueueCheck( mscclThreadLocalStatus& threadLocalStatus = mscclGetThreadLocalStatus(); bool repair = false; - if (*comm->abortFlag) + if (ncclParamResilientEnabled() && *comm->abortFlag) { int nicStat = 0; ncclNetIb.getStatus(&nicStat); diff --git a/src/proxy.cc b/src/proxy.cc index 6052e16..d3dc58d 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -14,11 +14,15 @@ #include "profiler.h" #define ENABLE_TIMER 0 #include "timer.h" +#include "param.h" #include #include extern ncclNet_t ncclNetIb; +static bool resilientDaemonRunning = false; +NCCL_PARAM(ResilientEnabled, "RESILIENT_ENABLED", 0); +NCCL_PARAM(ResilientCheckInterval, "RESILIENT_CHECK_INTERVAL", 5); static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) { if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice) return true; @@ -1555,45 +1559,42 @@ void* ncclProxyService(void* _args) { return NULL; } -void* ncclProxyServiceDaemon(void* _args) { +void* ncclResilientDaemon(void* _args) { ncclComm *comm = (ncclComm*)_args; - WARN("[Proxy Service] start the ncclProxyServiceDaemon now, ranks:%d, rank:%d", comm->nRanks, comm->rank); + WARN("[Proxy Service] start the ncclResilientDaemon now, ranks:%d, rank:%d", comm->nRanks, comm->rank); int* status = NULL; status = new int[comm->nRanks]; for (int i = 0; i < comm->nRanks; ++i) { status[i] = 0; } + int interval = ncclParamResilientCheckInterval(); - while(1) + while(resilientDaemonRunning) { int nicStat = 0; ncclNetIb.getStatus(&nicStat); status[comm->rank] = nicStat; - if (nicStat) - { - WARN("[Proxy Service] ncclProxyServiceDaemon, rank: %d detect the nic failure, will start to use allgather to notify others: %d", comm->rank, nicStat); - } bootstrapAllGather(comm->bootstrap, status, sizeof(int) * comm->nRanks); int all_status = 0; for (int i = 0; i < comm->nRanks; ++i) { all_status |= status[i]; } - WARN("[Proxy Service] ncclProxyServiceDaemon, finished allgather, all_status: %d", all_status); + WARN("[Proxy Service] ncclResilientDaemon, finished allgather, all_status: %d", all_status); - if (all_status != 0) + if (0 != all_status && 0 == *comm->abortFlag) { - WARN("[Proxy Service] ncclProxyServiceDaemon, detect the nic failure, will step the proxy service now"); + WARN("[Proxy Service] ncclResilientDaemon, detect the nic failure, will step the proxy service now"); *comm->abortFlag=1; - break; + continue; } else { - WARN("[Proxy Service] ncclProxyServiceDaemon, not detect the nic failure, will check it again later"); - sleep(5); + WARN("[Proxy Service] ncclResilientDaemon, not detect the nic failure, will check it again later"); + sleep(interval); } } - WARN("[Proxy Service] will quit the ncclProxyServiceDaemon now"); + WARN("[Proxy Service] will quit the ncclResilientDaemon now"); return NULL; } @@ -1627,10 +1628,14 @@ ncclResult_t ncclProxyCreate(struct ncclComm* comm) { proxyState->ncclCollNet = comm->ncclCollNet; memcpy(proxyState->buffSizes, comm->buffSizes, sizeof(comm->buffSizes)); - pthread_t thread; pthread_create(&comm->proxyState->thread, NULL, ncclProxyService, comm->proxyState); ncclSetThreadName(comm->proxyState->thread, "NCCL Service %2d", comm->cudaDev); - pthread_create(&thread, NULL, ncclProxyServiceDaemon, comm); + + if (ncclParamResilientEnabled()){ + pthread_t resilientDaemonthread; + resilientDaemonRunning = true; + pthread_create(&resilientDaemonthread, NULL, ncclResilientDaemon, comm); + } } return ncclSuccess; } @@ -1673,7 +1678,7 @@ ncclResult_t ncclProxyStop(struct ncclComm* comm) { } } } - + resilientDaemonRunning = false; return ncclSuccess; } diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index a805c58..23a6c52 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -28,7 +28,7 @@ #define MAXNAMESIZE 64 static char ncclIbIfName[MAX_IF_NAME_SIZE+1]; static union ncclSocketAddress ncclIbIfAddr; -int nicStatus = 0; +static int nicStatus = 0; struct ncclIbMr { uintptr_t addr; From 4519d2c6e6f8f7a4617ba304d01fa016d1c5be1b Mon Sep 17 00:00:00 2001 From: Andy Li Date: Wed, 13 Dec 2023 17:07:08 +0000 Subject: [PATCH 06/15] clean up code --- src/bootstrap.cc | 2 -- src/collectives/device/prims_ll.h | 3 --- src/collectives/device/prims_ll128.h | 3 --- src/include/bootstrap.h | 1 - src/include/msccl/msccl_lifecycle.h | 2 -- src/include/msccl/msccl_scheduler.h | 1 - src/misc/msccl/msccl_lifecycle.cc | 1 - src/proxy.cc | 2 -- 8 files changed, 15 deletions(-) diff --git a/src/bootstrap.cc b/src/bootstrap.cc index a88e71b..764cb6c 100644 --- a/src/bootstrap.cc +++ b/src/bootstrap.cc @@ -13,8 +13,6 @@ #include #include "proxy.h" -#define NCCL_BOOTSTRAP_NAME "github.com/Azure/msccl-executor/bootstrap" - struct bootstrapRootArgs { struct ncclSocket* listenSock; uint64_t magic; diff --git a/src/collectives/device/prims_ll.h b/src/collectives/device/prims_ll.h index acc2c87..3c4355f 100644 --- a/src/collectives/device/prims_ll.h +++ b/src/collectives/device/prims_ll.h @@ -8,8 +8,6 @@ #include "npkit/npkit.h" #endif -#include - template class Primitives: public PrimitivesWithoutDirect> { @@ -77,7 +75,6 @@ class Primitives: inline __device__ int checkAbort(int &spins, int send) { spins++; if (abort == 0 && spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { - printf("checkAbort LL, abortFlag:%u \n", *ncclShmem.comm.abortFlag); abort = *ncclShmem.comm.abortFlag; spins = 0; } diff --git a/src/collectives/device/prims_ll128.h b/src/collectives/device/prims_ll128.h index afc9c3e..6b906ce 100644 --- a/src/collectives/device/prims_ll128.h +++ b/src/collectives/device/prims_ll128.h @@ -11,8 +11,6 @@ #include "npkit/npkit.h" #endif -#include - #define NCCL_LL128_FLAGTHREAD (NCCL_LL128_LINEELEMS-1) template @@ -81,7 +79,6 @@ class Primitives: inline __device__ int checkAbort(int &spins, int i, int send) { spins++; if (abort == 0 && spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { - printf("checkAbort LL128, abortFlag:%u \n", *ncclShmem.comm.abortFlag); abort = *ncclShmem.comm.abortFlag; spins = 0; } diff --git a/src/include/bootstrap.h b/src/include/bootstrap.h index 9768b04..400a479 100644 --- a/src/include/bootstrap.h +++ b/src/include/bootstrap.h @@ -29,5 +29,4 @@ ncclResult_t bootstrapIntraNodeAllGather(void* commState, int *ranks, int rank, ncclResult_t bootstrapIntraNodeBroadcast(void* commState, int *ranks, int rank, int nranks, int root, void* bcastData, int size); ncclResult_t bootstrapClose(void* commState); ncclResult_t bootstrapAbort(void* commState); - #endif diff --git a/src/include/msccl/msccl_lifecycle.h b/src/include/msccl/msccl_lifecycle.h index 2cf3d1a..5ae3584 100644 --- a/src/include/msccl/msccl_lifecycle.h +++ b/src/include/msccl/msccl_lifecycle.h @@ -10,8 +10,6 @@ #include "msccl/msccl_struct.h" -// NCCL_PARAM(ResilientEnabled, "RESILIENT_ENABLED", 0); - bool mscclEnabled(); void mscclSetIsCallerFlag(); diff --git a/src/include/msccl/msccl_scheduler.h b/src/include/msccl/msccl_scheduler.h index 1636a67..23e1840 100644 --- a/src/include/msccl/msccl_scheduler.h +++ b/src/include/msccl/msccl_scheduler.h @@ -37,7 +37,6 @@ struct mscclSchedulerParam { bool scheduled; mscclAlgoHandle_t handle; bool repair; - ncclComm_t comm; }; typedef struct { diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index 8aa8384..dafc635 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -350,7 +350,6 @@ static ncclResult_t mscclSetSavedSchedulerParam( param->p.rank = comm->rank; param->p.nRanks = comm->nRanks; param->p.repair = repair; - param->p.comm = comm; param->comm = comm; param->stream = stream; return ncclSuccess; diff --git a/src/proxy.cc b/src/proxy.cc index d3dc58d..164099b 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -867,7 +867,6 @@ void* ncclProxyProgress(void *proxyState_) { ncclResult_t ret = progressOps(proxyState, state, state->active, &idle); if (ret != ncclSuccess) { INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); - //*proxyState->abortFlag = 1; return NULL; } if (lastIdle == 0 && idle == 1) ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileIdle); @@ -1391,7 +1390,6 @@ static ncclResult_t proxyServiceInitOp(int type, struct ncclProxyLocalPeer* peer } #include -extern int nicfailure; static bool proxyMatchOpType(int type) { switch (type) { case ncclProxyMsgInit: From 06591a261812a2e4e5241de3feb37f86063d6141 Mon Sep 17 00:00:00 2001 From: Andy Li Date: Thu, 14 Dec 2023 16:55:10 +0000 Subject: [PATCH 07/15] fix the bootstrap function call issue --- src/include/msccl/msccl_scheduler.h | 4 ++++ src/misc/msccl/msccl_lifecycle.cc | 19 +++++++++++++------ src/proxy.cc | 10 +++++----- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/include/msccl/msccl_scheduler.h b/src/include/msccl/msccl_scheduler.h index 23e1840..9fd6e6a 100644 --- a/src/include/msccl/msccl_scheduler.h +++ b/src/include/msccl/msccl_scheduler.h @@ -37,6 +37,10 @@ struct mscclSchedulerParam { bool scheduled; mscclAlgoHandle_t handle; bool repair; + void* bootstrap; + ncclResult_t (*send)(void* commState, int peer, int tag, void* data, int size); + ncclResult_t (*receive)(void* commState, int peer, int tag, void* data, int size); + ncclResult_t (*allgather)(void* commState, void* allData, int size); }; typedef struct { diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index dafc635..a67495b 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -234,9 +234,9 @@ ncclResult_t mscclInit(ncclComm_t comm) { initParam.rank = comm->rank; initParam.nNodes = comm->nNodes; initParam.bootstrap = comm->bootstrap; - initParam.send = bootstrapSend; - initParam.receive = bootstrapRecv; - initParam.allgather = bootstrapAllGather; + initParam.send = &bootstrapSend; + initParam.receive = &bootstrapRecv; + initParam.allgather = &bootstrapAllGather; NCCLCHECK(mscclSchedulerInit(&initParam)); @@ -333,7 +333,10 @@ static ncclResult_t mscclSetSavedSchedulerParam( const void* sendBuff, const size_t sendCounts[], const size_t sDisPls[], void* recvBuff, const size_t recvCounts[], const size_t rDisPls[], size_t count, ncclDataType_t dataType, int root, int peer, ncclRedOp_t op, - mscclFunc_t func, ncclComm_t comm, cudaStream_t stream, bool repair, + mscclFunc_t func, ncclComm_t comm, cudaStream_t stream, bool repair, + ncclResult_t (*send)(void* commState, int peer, int tag, void* data, int size), + ncclResult_t (*receive)(void* commState, int peer, int tag, void* data, int size), + ncclResult_t (*allgather)(void* commState, void* allData, int size), struct mscclSavedSchedulerParam* param) { param->p.sendBuff = sendBuff; param->p.sendCounts = sendCounts; @@ -350,6 +353,10 @@ static ncclResult_t mscclSetSavedSchedulerParam( param->p.rank = comm->rank; param->p.nRanks = comm->nRanks; param->p.repair = repair; + param->p.bootstrap = comm->bootstrap; + param->p.send = send; + param->p.receive = receive; + param->p.allgather = allgather; param->comm = comm; param->stream = stream; return ncclSuccess; @@ -455,12 +462,12 @@ ncclResult_t mscclEnqueueCheck( threadLocalStatus.savedSchedulerParams.push_back({}); NCCLCHECK(mscclSetSavedSchedulerParam( sendBuff, sendCounts, sDisPls, recvBuff, recvCounts, rDisPls, - count, dataType, root, peer, op, func, comm, stream, repair, + count, dataType, root, peer, op, func, comm, stream, repair, &bootstrapSend, &bootstrapRecv, &bootstrapAllGather, &threadLocalStatus.savedSchedulerParams.back())); switch (threadLocalStatus.groupStatus) { case mscclNoGroup: - if (comm->mscclCompatible) { + if (comm->mscclCompatible) { NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { NCCLCHECK(mscclRunSavedParams()); diff --git a/src/proxy.cc b/src/proxy.cc index 164099b..540c95c 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1559,7 +1559,7 @@ void* ncclProxyService(void* _args) { void* ncclResilientDaemon(void* _args) { ncclComm *comm = (ncclComm*)_args; - WARN("[Proxy Service] start the ncclResilientDaemon now, ranks:%d, rank:%d", comm->nRanks, comm->rank); + INFO(NCCL_INIT, "[Proxy Service] start the ncclResilientDaemon now, ranks:%d, rank:%d", comm->nRanks, comm->rank); int* status = NULL; status = new int[comm->nRanks]; for (int i = 0; i < comm->nRanks; ++i) { @@ -1578,21 +1578,21 @@ void* ncclResilientDaemon(void* _args) { all_status |= status[i]; } - WARN("[Proxy Service] ncclResilientDaemon, finished allgather, all_status: %d", all_status); + INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, finished allgather, all_status: %d", all_status); if (0 != all_status && 0 == *comm->abortFlag) { - WARN("[Proxy Service] ncclResilientDaemon, detect the nic failure, will step the proxy service now"); + INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, detect the nic failure, will stop the proxy service now"); *comm->abortFlag=1; continue; } else { - WARN("[Proxy Service] ncclResilientDaemon, not detect the nic failure, will check it again later"); + INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, not detect the nic failure, will check it again later"); sleep(interval); } } - WARN("[Proxy Service] will quit the ncclResilientDaemon now"); + INFO(NCCL_INIT, "[Proxy Service] will quit the ncclResilientDaemon now"); return NULL; } From 30a7b3be037602051ccf45eb474656a01746257f Mon Sep 17 00:00:00 2001 From: Andy Li Date: Wed, 20 Dec 2023 08:24:52 +0000 Subject: [PATCH 08/15] add some debug info --- src/misc/msccl/msccl_lifecycle.cc | 2 +- src/proxy.cc | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index a67495b..821406b 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -454,7 +454,7 @@ ncclResult_t mscclEnqueueCheck( ncclNetIb.getStatus(&nicStat); if (nicStat) { - *comm->abortFlag=0; + *comm->abortFlag = 0; repair = true; } } diff --git a/src/proxy.cc b/src/proxy.cc index 540c95c..56f1930 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1572,9 +1572,11 @@ void* ncclResilientDaemon(void* _args) { int nicStat = 0; ncclNetIb.getStatus(&nicStat); status[comm->rank] = nicStat; + INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, rank %d nic status: %d", comm->rank, nicStat); bootstrapAllGather(comm->bootstrap, status, sizeof(int) * comm->nRanks); int all_status = 0; for (int i = 0; i < comm->nRanks; ++i) { + INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, finished allgather, status:%d %d", i, status[i]); all_status |= status[i]; } From 0262babdd17b8878b584718d2b469c13f7b00ae2 Mon Sep 17 00:00:00 2001 From: Andy Li Date: Thu, 21 Dec 2023 02:27:44 +0000 Subject: [PATCH 09/15] fix bootstrap allgather bug --- src/proxy.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proxy.cc b/src/proxy.cc index 56f1930..6fce851 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1573,7 +1573,7 @@ void* ncclResilientDaemon(void* _args) { ncclNetIb.getStatus(&nicStat); status[comm->rank] = nicStat; INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, rank %d nic status: %d", comm->rank, nicStat); - bootstrapAllGather(comm->bootstrap, status, sizeof(int) * comm->nRanks); + bootstrapAllGather(comm->bootstrap, status, sizeof(int)); int all_status = 0; for (int i = 0; i < comm->nRanks; ++i) { INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, finished allgather, status:%d %d", i, status[i]); From 1bd3e5397854036af8cbd3fee835353062b9887f Mon Sep 17 00:00:00 2001 From: Andy Li Date: Thu, 21 Dec 2023 02:44:42 +0000 Subject: [PATCH 10/15] fix the ncclResilientDaemon bug --- src/proxy.cc | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/proxy.cc b/src/proxy.cc index 6fce851..43d276f 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1560,16 +1560,16 @@ void* ncclProxyService(void* _args) { void* ncclResilientDaemon(void* _args) { ncclComm *comm = (ncclComm*)_args; INFO(NCCL_INIT, "[Proxy Service] start the ncclResilientDaemon now, ranks:%d, rank:%d", comm->nRanks, comm->rank); - int* status = NULL; - status = new int[comm->nRanks]; - for (int i = 0; i < comm->nRanks; ++i) { - status[i] = 0; - } int interval = ncclParamResilientCheckInterval(); - + int *status = (int*)malloc(comm->nRanks * sizeof(int)); + if (status == NULL) { + WARN("[Proxy Service] ncclResilientDaemon, memory allocation failed, ncclResilientDaemon will quit soon"); + } while(resilientDaemonRunning) { int nicStat = 0; + memset(status, 0, comm->nRanks * sizeof(status)); + ncclNetIb.getStatus(&nicStat); status[comm->rank] = nicStat; INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, rank %d nic status: %d", comm->rank, nicStat); @@ -1594,6 +1594,8 @@ void* ncclResilientDaemon(void* _args) { sleep(interval); } } + free(status); + INFO(NCCL_INIT, "[Proxy Service] will quit the ncclResilientDaemon now"); return NULL; } From c0ba1acc4805d5d48bbec465a5b85992e45d082f Mon Sep 17 00:00:00 2001 From: Andy Li Date: Thu, 21 Dec 2023 04:15:48 +0000 Subject: [PATCH 11/15] add some debug info --- src/include/nccl_net.h | 2 +- src/misc/msccl/msccl_lifecycle.cc | 3 ++- src/proxy.cc | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/include/nccl_net.h b/src/include/nccl_net.h index 387b52b..7ee3fbf 100644 --- a/src/include/nccl_net.h +++ b/src/include/nccl_net.h @@ -50,7 +50,7 @@ typedef struct { ncclResult_t (*getProperties)(int dev, ncclNetProperties_v6_t* props); // Get device status. ncclResult_t (*getStatus)(int* nstat); - // Get various device properties. + // Set device status. ncclResult_t (*setStatus)(int nstat); // Create a receiving object and provide a handle to connect to it. The // handle can be up to NCCL_NET_HANDLE_MAXSIZE bytes and will be exchanged diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index 821406b..11fcfb0 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -456,6 +456,8 @@ ncclResult_t mscclEnqueueCheck( { *comm->abortFlag = 0; repair = true; + ncclNetIb.setStatus(0); + INFO(NCCL_INIT, "MSCCL: Reset nic status to 0 for rank: %d", comm->rank); } } @@ -496,7 +498,6 @@ ncclResult_t mscclEnqueueCheck( if(repair) { repair = false; - ncclNetIb.setStatus(0); } return ncclSuccess; } diff --git a/src/proxy.cc b/src/proxy.cc index 43d276f..76ebb69 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1568,12 +1568,12 @@ void* ncclResilientDaemon(void* _args) { while(resilientDaemonRunning) { int nicStat = 0; - memset(status, 0, comm->nRanks * sizeof(status)); + memset(status, 0, comm->nRanks * sizeof(int)); ncclNetIb.getStatus(&nicStat); status[comm->rank] = nicStat; INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, rank %d nic status: %d", comm->rank, nicStat); - bootstrapAllGather(comm->bootstrap, status, sizeof(int)); + bootstrapAllGather(comm->bootstrap, status, comm->nRanks * sizeof(int)); int all_status = 0; for (int i = 0; i < comm->nRanks; ++i) { INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, finished allgather, status:%d %d", i, status[i]); From e4a1937235680c1f48a987e850f293037b133f14 Mon Sep 17 00:00:00 2001 From: Andy Li Date: Mon, 25 Dec 2023 03:24:08 +0000 Subject: [PATCH 12/15] remove some debug info --- src/collectives/device/prims_simple.h | 2 +- src/misc/msccl/msccl_lifecycle.cc | 2 +- src/proxy.cc | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/collectives/device/prims_simple.h b/src/collectives/device/prims_simple.h index 7cd99c0..dba3b86 100644 --- a/src/collectives/device/prims_simple.h +++ b/src/collectives/device/prims_simple.h @@ -113,8 +113,8 @@ class Primitives< inline __device__ bool checkAbort(int &spins) { spins++; if (!(flags & Aborted) && spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { - printf("checkAbort Simple, abortFlag:%u \n", *ncclShmem.comm.abortFlag); if (*ncclShmem.comm.abortFlag) { + // printf("checkAbort Simple, abortFlag:%u \n", *ncclShmem.comm.abortFlag); flags |= Aborted; ncclShmem.aborted = 1; } diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index 11fcfb0..2b26c4b 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -444,12 +444,12 @@ ncclResult_t mscclEnqueueCheck( mscclFunc_t func, ncclComm_t comm, cudaStream_t stream) { int nicStat = 0; ncclNetIb.getStatus(&nicStat); - INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck mscclNoGroup com abort flag: %d, nic failure: %d", *comm->abortFlag, nicStat); mscclThreadLocalStatus& threadLocalStatus = mscclGetThreadLocalStatus(); bool repair = false; if (ncclParamResilientEnabled() && *comm->abortFlag) { + INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck mscclNoGroup com abort flag: %d, nic failure: %d", *comm->abortFlag, nicStat); int nicStat = 0; ncclNetIb.getStatus(&nicStat); if (nicStat) diff --git a/src/proxy.cc b/src/proxy.cc index 76ebb69..05f6a52 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1576,7 +1576,6 @@ void* ncclResilientDaemon(void* _args) { bootstrapAllGather(comm->bootstrap, status, comm->nRanks * sizeof(int)); int all_status = 0; for (int i = 0; i < comm->nRanks; ++i) { - INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, finished allgather, status:%d %d", i, status[i]); all_status |= status[i]; } @@ -1584,7 +1583,7 @@ void* ncclResilientDaemon(void* _args) { if (0 != all_status && 0 == *comm->abortFlag) { - INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, detect the nic failure, will stop the proxy service now"); + INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, detect the nic failure, will abort the kernel execution now"); *comm->abortFlag=1; continue; } From c856fc59e7a1059e48401ec3241f50a8b25b57e4 Mon Sep 17 00:00:00 2001 From: Andy Li Date: Wed, 27 Dec 2023 16:31:51 +0000 Subject: [PATCH 13/15] add logic to remove the ops from proxy when in resilient repairing mode --- src/include/comm.h | 2 ++ src/misc/msccl/msccl_lifecycle.cc | 41 ++++++++++++++----------------- src/proxy.cc | 34 ++++++++++++++++--------- 3 files changed, 43 insertions(+), 34 deletions(-) diff --git a/src/include/comm.h b/src/include/comm.h index 88e3ee7..3c66ae9 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -347,6 +347,8 @@ struct ncclComm { int finalizeRankCnt; // Whether this comm is compatible with MSCCL bool mscclCompatible; + // Whether this comm is current in resilient repairing mode + bool resilientRepairing; }; enum ncclLaunchMode { diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index 2b26c4b..e73f777 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -33,6 +33,7 @@ extern ncclResult_t bootstrapSend(void* commState, int peer, int tag, void* data extern ncclResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size); extern ncclNet_t ncclNetIb; extern int64_t ncclParamResilientEnabled(); +extern bool proxyResilientRepairingMode; int getEnvInt(const char* env, int64_t deftVal) { char* str = getenv(env); @@ -442,29 +443,12 @@ ncclResult_t mscclEnqueueCheck( void* recvBuff, const size_t recvCounts[], const size_t rDisPls[], size_t count, ncclDataType_t dataType, int root, int peer, ncclRedOp_t op, mscclFunc_t func, ncclComm_t comm, cudaStream_t stream) { - int nicStat = 0; - ncclNetIb.getStatus(&nicStat); mscclThreadLocalStatus& threadLocalStatus = mscclGetThreadLocalStatus(); - bool repair = false; - - if (ncclParamResilientEnabled() && *comm->abortFlag) - { - INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck mscclNoGroup com abort flag: %d, nic failure: %d", *comm->abortFlag, nicStat); - int nicStat = 0; - ncclNetIb.getStatus(&nicStat); - if (nicStat) - { - *comm->abortFlag = 0; - repair = true; - ncclNetIb.setStatus(0); - INFO(NCCL_INIT, "MSCCL: Reset nic status to 0 for rank: %d", comm->rank); - } - } threadLocalStatus.savedSchedulerParams.push_back({}); NCCLCHECK(mscclSetSavedSchedulerParam( sendBuff, sendCounts, sDisPls, recvBuff, recvCounts, rDisPls, - count, dataType, root, peer, op, func, comm, stream, repair, &bootstrapSend, &bootstrapRecv, &bootstrapAllGather, + count, dataType, root, peer, op, func, comm, stream, comm->resilientRepairing, &bootstrapSend, &bootstrapRecv, &bootstrapAllGather, &threadLocalStatus.savedSchedulerParams.back())); switch (threadLocalStatus.groupStatus) { @@ -472,6 +456,15 @@ ncclResult_t mscclEnqueueCheck( if (comm->mscclCompatible) { NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { + if(ncclParamResilientEnabled() && comm->resilientRepairing) + { + INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck and in resilient repairing mode now"); + comm->resilientRepairing = false; + proxyResilientRepairingMode = false; + *comm->abortFlag = 0; + ncclNetIb.setStatus(0); + } + INFO(NCCL_INIT, "MSCCL: mscclRunSavedParams for rank: %d", comm->rank); NCCLCHECK(mscclRunSavedParams()); break; } @@ -482,6 +475,14 @@ ncclResult_t mscclEnqueueCheck( if (comm->mscclCompatible) { NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { + if(ncclParamResilientEnabled() && comm->resilientRepairing) + { + INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck and in resilient repairing mode now"); + comm->resilientRepairing = false; + proxyResilientRepairingMode = false; + *comm->abortFlag = 0; + ncclNetIb.setStatus(0); + } // Only save counts and displs when there is suitable MSCCL algorithm for this NCCLCHECK(mscclSaveCountsAndDispls(&threadLocalStatus.savedSchedulerParams.back())); break; @@ -495,10 +496,6 @@ ncclResult_t mscclEnqueueCheck( default: return ncclInvalidUsage; } - if(repair) - { - repair = false; - } return ncclSuccess; } diff --git a/src/proxy.cc b/src/proxy.cc index 05f6a52..6531298 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -21,6 +21,7 @@ extern ncclNet_t ncclNetIb; static bool resilientDaemonRunning = false; +static bool proxyResilientRepairingMode = false; NCCL_PARAM(ResilientEnabled, "RESILIENT_ENABLED", 0); NCCL_PARAM(ResilientCheckInterval, "RESILIENT_CHECK_INTERVAL", 5); @@ -687,7 +688,13 @@ static ncclResult_t progressOps(struct ncclProxyState* proxyState, struct ncclPr while (op) { if (op->state == ncclProxyOpNone) return ncclInternalError; TIME_START(0); TIME_START(1); - NCCLCHECK(op->progress(proxyState, op)); + if (proxyResilientRepairingMode) + { + // if in resilient repairing state, we need to clean up all the ops from the proxy + op->state = ncclProxyOpNone; + }else{ + NCCLCHECK(op->progress(proxyState, op)); + } if (op->idle) { TIME_STOP(1); TIME_CANCEL(0); } else { TIME_CANCEL(1); TIME_STOP(0); } *idle &= op->idle; if (op->state == ncclProxyOpNone) { @@ -887,6 +894,7 @@ void* ncclProxyProgress(void *proxyState_) { } lastIdle = idle; } + INFO(NCCL_ALL,"[Proxy Thread] ncclProxyProgress exit"); return NULL; } @@ -1440,7 +1448,10 @@ void* ncclProxyService(void* _args) { /* Even if local comm aborts, we cannot let proxy thread exit if we still have peer * connections. Need to wait until all other related comms call abort and safely exit * together, or we could face segmentation fault. */ - if (*proxyState->abortFlag != 0) stop = 1; + if (*proxyState->abortFlag != 0) { + INFO(NCCL_INIT|NCCL_NET,"ncclProxyService: receive the abortFlag signal now \n"); + stop = 1; + } /* never let proxy service thread blocks in poll, or it cannot receive abortFlag. */ int ret; do { @@ -1567,31 +1578,30 @@ void* ncclResilientDaemon(void* _args) { } while(resilientDaemonRunning) { + if (!comm->resilientRepairing) + { int nicStat = 0; memset(status, 0, comm->nRanks * sizeof(int)); ncclNetIb.getStatus(&nicStat); status[comm->rank] = nicStat; - INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, rank %d nic status: %d", comm->rank, nicStat); + bootstrapAllGather(comm->bootstrap, status, comm->nRanks * sizeof(int)); int all_status = 0; for (int i = 0; i < comm->nRanks; ++i) { all_status |= status[i]; } - - INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, finished allgather, all_status: %d", all_status); - if (0 != all_status && 0 == *comm->abortFlag) + if (0 != all_status) { INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, detect the nic failure, will abort the kernel execution now"); - *comm->abortFlag=1; + comm->resilientRepairing = true; + *comm->abortFlag = 1; + proxyResilientRepairingMode = true; continue; } - else - { - INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, not detect the nic failure, will check it again later"); - sleep(interval); - } + } + sleep(interval); } free(status); From 2e4edc719244cc9237574f912f0a3ada9ca15dba Mon Sep 17 00:00:00 2001 From: Andy Li Date: Thu, 28 Dec 2023 03:57:26 +0000 Subject: [PATCH 14/15] seperate the abort flag to only indicate the resilient repairing status --- src/collectives/device/prims_simple.h | 4 ++-- src/include/comm.h | 2 +- src/include/devcomm.h | 3 ++- src/include/proxy.h | 2 ++ src/init.cc | 6 ++++++ src/misc/msccl/msccl_lifecycle.cc | 13 +++++-------- src/proxy.cc | 9 ++++----- 7 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/collectives/device/prims_simple.h b/src/collectives/device/prims_simple.h index dba3b86..0b5941d 100644 --- a/src/collectives/device/prims_simple.h +++ b/src/collectives/device/prims_simple.h @@ -113,8 +113,8 @@ class Primitives< inline __device__ bool checkAbort(int &spins) { spins++; if (!(flags & Aborted) && spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { - if (*ncclShmem.comm.abortFlag) { - // printf("checkAbort Simple, abortFlag:%u \n", *ncclShmem.comm.abortFlag); + if (*ncclShmem.comm.abortFlag || *ncclShmem.comm.resilientRepairing) { + printf("checkAbort Simple, abortFlag:%u \n", *ncclShmem.comm.abortFlag); flags |= Aborted; ncclShmem.aborted = 1; } diff --git a/src/include/comm.h b/src/include/comm.h index 3c66ae9..12edf6c 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -348,7 +348,7 @@ struct ncclComm { // Whether this comm is compatible with MSCCL bool mscclCompatible; // Whether this comm is current in resilient repairing mode - bool resilientRepairing; + volatile bool *resilientRepairing; }; enum ncclLaunchMode { diff --git a/src/include/devcomm.h b/src/include/devcomm.h index 7aca4ae..1182fce 100644 --- a/src/include/devcomm.h +++ b/src/include/devcomm.h @@ -312,7 +312,8 @@ struct ncclDevComm { NpKitEventCollectContext* npKitEventCollectContexts; uint64_t* cpuTimestamp; #endif - + // Whether this comm is current in resilient repairing mode + volatile bool *resilientRepairing; }; struct alignas(16) ncclDevCommAndChannels { diff --git a/src/include/proxy.h b/src/include/proxy.h index d0067b1..ac3c552 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -228,6 +228,8 @@ struct ncclProxyState { // Queue of expected responses from the proxy struct ncclExpectedProxyResponse* expectedResponses; + // Whether this comm is current in resilient repairing mode + volatile bool *resilientRepairing; }; enum proxyConnectState { diff --git a/src/init.cc b/src/init.cc index e466363..a37edda 100644 --- a/src/init.cc +++ b/src/init.cc @@ -239,6 +239,7 @@ static ncclResult_t commFree(ncclComm_t comm) { if (ncclAtomicRefCountDecrement(comm->abortFlagRefCount) == 0) { NCCLCHECK(ncclCudaHostFree((void *)comm->abortFlag)); + NCCLCHECK(ncclCudaHostFree((void *)comm->resilientRepairing)); free(comm->abortFlagRefCount); } free((void*)comm->config.netName); @@ -400,6 +401,7 @@ static ncclResult_t devCommSetup(ncclComm_t comm) { tmpCommAndChans.comm.rank = comm->rank; tmpCommAndChans.comm.nRanks = nRanks; tmpCommAndChans.comm.abortFlag = comm->abortFlag; + tmpCommAndChans.comm.resilientRepairing = comm->resilientRepairing; for (int p=0; p < NCCL_NUM_PROTOCOLS; p++) { tmpCommAndChans.comm.buffSizes[p] = comm->buffSizes[p]; } @@ -1617,6 +1619,9 @@ static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUni NCCLCHECKGOTO(ncclCudaHostCalloc((uint32_t**)&comm->abortFlag, 1), res, fail); NCCLCHECKGOTO(ncclCalloc((uint32_t**)&comm->abortFlagRefCount, 1), res, fail); *comm->abortFlagRefCount = 1; + NCCLCHECKGOTO(ncclCudaHostCalloc((bool**)&comm->resilientRepairing, 1), res, fail); + *comm->resilientRepairing = false; + NCCLCHECKGOTO(parseCommConfig(comm, config), res, fail); /* start with ncclInternalError and will be changed to ncclSuccess if init succeeds. */ comm->initState = ncclInternalError; @@ -1636,6 +1641,7 @@ static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUni if (comm) { if (comm->abortFlag) ncclCudaHostFree((void *)comm->abortFlag); if (comm->abortFlagRefCount) free(comm->abortFlagRefCount); + if (comm->resilientRepairing) ncclCudaHostFree((void *)comm->resilientRepairing); free(comm); } if (newcomm) *newcomm = NULL; diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index e73f777..fa960d4 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -33,7 +33,6 @@ extern ncclResult_t bootstrapSend(void* commState, int peer, int tag, void* data extern ncclResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size); extern ncclNet_t ncclNetIb; extern int64_t ncclParamResilientEnabled(); -extern bool proxyResilientRepairingMode; int getEnvInt(const char* env, int64_t deftVal) { char* str = getenv(env); @@ -448,7 +447,7 @@ ncclResult_t mscclEnqueueCheck( threadLocalStatus.savedSchedulerParams.push_back({}); NCCLCHECK(mscclSetSavedSchedulerParam( sendBuff, sendCounts, sDisPls, recvBuff, recvCounts, rDisPls, - count, dataType, root, peer, op, func, comm, stream, comm->resilientRepairing, &bootstrapSend, &bootstrapRecv, &bootstrapAllGather, + count, dataType, root, peer, op, func, comm, stream, *comm->resilientRepairing, &bootstrapSend, &bootstrapRecv, &bootstrapAllGather, &threadLocalStatus.savedSchedulerParams.back())); switch (threadLocalStatus.groupStatus) { @@ -459,9 +458,8 @@ ncclResult_t mscclEnqueueCheck( if(ncclParamResilientEnabled() && comm->resilientRepairing) { INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck and in resilient repairing mode now"); - comm->resilientRepairing = false; - proxyResilientRepairingMode = false; - *comm->abortFlag = 0; + *comm->resilientRepairing = false; + // *comm->abortFlag = 0; ncclNetIb.setStatus(0); } INFO(NCCL_INIT, "MSCCL: mscclRunSavedParams for rank: %d", comm->rank); @@ -478,9 +476,8 @@ ncclResult_t mscclEnqueueCheck( if(ncclParamResilientEnabled() && comm->resilientRepairing) { INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck and in resilient repairing mode now"); - comm->resilientRepairing = false; - proxyResilientRepairingMode = false; - *comm->abortFlag = 0; + *comm->resilientRepairing = false; + // *comm->abortFlag = 0; ncclNetIb.setStatus(0); } // Only save counts and displs when there is suitable MSCCL algorithm for this diff --git a/src/proxy.cc b/src/proxy.cc index 6531298..5bc2e85 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -21,7 +21,6 @@ extern ncclNet_t ncclNetIb; static bool resilientDaemonRunning = false; -static bool proxyResilientRepairingMode = false; NCCL_PARAM(ResilientEnabled, "RESILIENT_ENABLED", 0); NCCL_PARAM(ResilientCheckInterval, "RESILIENT_CHECK_INTERVAL", 5); @@ -688,7 +687,7 @@ static ncclResult_t progressOps(struct ncclProxyState* proxyState, struct ncclPr while (op) { if (op->state == ncclProxyOpNone) return ncclInternalError; TIME_START(0); TIME_START(1); - if (proxyResilientRepairingMode) + if (*proxyState->resilientRepairing) { // if in resilient repairing state, we need to clean up all the ops from the proxy op->state = ncclProxyOpNone; @@ -1595,9 +1594,8 @@ void* ncclResilientDaemon(void* _args) { if (0 != all_status) { INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, detect the nic failure, will abort the kernel execution now"); - comm->resilientRepairing = true; - *comm->abortFlag = 1; - proxyResilientRepairingMode = true; + *comm->resilientRepairing = true; + // *comm->abortFlag = 1; continue; } } @@ -1637,6 +1635,7 @@ ncclResult_t ncclProxyCreate(struct ncclComm* comm) { proxyState->dmaBufSupport = comm->dmaBufSupport; proxyState->ncclNet = comm->ncclNet; proxyState->ncclCollNet = comm->ncclCollNet; + proxyState->resilientRepairing = comm->resilientRepairing; memcpy(proxyState->buffSizes, comm->buffSizes, sizeof(comm->buffSizes)); pthread_create(&comm->proxyState->thread, NULL, ncclProxyService, comm->proxyState); From a850a62f1a84e8b1fa5ab7f8337f9aa3e207bf60 Mon Sep 17 00:00:00 2001 From: Andy Li Date: Tue, 26 Mar 2024 03:54:00 +0000 Subject: [PATCH 15/15] refine the code --- src/collectives/all_reduce.cc | 3 ++- src/collectives/device/prims_simple.h | 2 +- src/misc/msccl/msccl_lifecycle.cc | 16 +++++++++++++--- src/proxy.cc | 6 ++++-- src/transport/net_ib.cc | 1 - 5 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/collectives/all_reduce.cc b/src/collectives/all_reduce.cc index 2dc3106..bea6313 100644 --- a/src/collectives/all_reduce.cc +++ b/src/collectives/all_reduce.cc @@ -8,6 +8,7 @@ #include "enqueue.h" #include "nccl.h" #include "param.h" +#include #include "msccl/msccl_lifecycle.h" extern int64_t ncclParamResilientEnabled(); @@ -28,7 +29,7 @@ ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count, }; NvtxParamsAllReduce payload{count * ncclTypeSize(datatype), op}; NVTX3_FUNC_WITH_PARAMS(AllReduce, AllReduceSchema, payload) - + INFO(NCCL_INIT, "MSCCL: Enter into ncclAllReduce now"); ncclResult_t ret; if (mscclAvailable() && !mscclIsCaller()) { ret = mscclEnqueueCheck( diff --git a/src/collectives/device/prims_simple.h b/src/collectives/device/prims_simple.h index 0b5941d..f269dfa 100644 --- a/src/collectives/device/prims_simple.h +++ b/src/collectives/device/prims_simple.h @@ -114,7 +114,7 @@ class Primitives< spins++; if (!(flags & Aborted) && spins == NCCL_SPINS_BEFORE_CHECK_ABORT) { if (*ncclShmem.comm.abortFlag || *ncclShmem.comm.resilientRepairing) { - printf("checkAbort Simple, abortFlag:%u \n", *ncclShmem.comm.abortFlag); + printf("checkAbort Simple, resilientRepairingFlag:%d, abortFlag:%u \n", *ncclShmem.comm.resilientRepairing, *ncclShmem.comm.abortFlag); flags |= Aborted; ncclShmem.aborted = 1; } diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index fa960d4..307ab86 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -445,6 +445,16 @@ ncclResult_t mscclEnqueueCheck( mscclThreadLocalStatus& threadLocalStatus = mscclGetThreadLocalStatus(); threadLocalStatus.savedSchedulerParams.push_back({}); + bool repair = false; + INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck now"); + // if(ncclParamResilientEnabled() && *comm->resilientRepairing) + // { + // INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck and in resilient repairing mode now"); + // *comm->resilientRepairing = false; + // // *comm->abortFlag = 0; + // ncclNetIb.setStatus(0); + // repair = true; + // } NCCLCHECK(mscclSetSavedSchedulerParam( sendBuff, sendCounts, sDisPls, recvBuff, recvCounts, rDisPls, count, dataType, root, peer, op, func, comm, stream, *comm->resilientRepairing, &bootstrapSend, &bootstrapRecv, &bootstrapAllGather, @@ -455,9 +465,9 @@ ncclResult_t mscclEnqueueCheck( if (comm->mscclCompatible) { NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { - if(ncclParamResilientEnabled() && comm->resilientRepairing) + if(ncclParamResilientEnabled() && *comm->resilientRepairing) { - INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck and in resilient repairing mode now"); + INFO(NCCL_INIT, "MSCCL: Enter into mscclNoGroup's mscclEnqueueCheck and in resilient repairing mode now"); *comm->resilientRepairing = false; // *comm->abortFlag = 0; ncclNetIb.setStatus(0); @@ -473,7 +483,7 @@ ncclResult_t mscclEnqueueCheck( if (comm->mscclCompatible) { NCCLCHECK(mscclSchedulerSelectAlgo(&threadLocalStatus.savedSchedulerParams.back())); if (threadLocalStatus.savedSchedulerParams.back().p.scheduled) { - if(ncclParamResilientEnabled() && comm->resilientRepairing) + if(ncclParamResilientEnabled() && *comm->resilientRepairing) { INFO(NCCL_INIT, "MSCCL: Enter into mscclEnqueueCheck and in resilient repairing mode now"); *comm->resilientRepairing = false; diff --git a/src/proxy.cc b/src/proxy.cc index 5bc2e85..0080e1e 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -21,6 +21,7 @@ extern ncclNet_t ncclNetIb; static bool resilientDaemonRunning = false; +static bool resilientRepairing = false; NCCL_PARAM(ResilientEnabled, "RESILIENT_ENABLED", 0); NCCL_PARAM(ResilientCheckInterval, "RESILIENT_CHECK_INTERVAL", 5); @@ -690,6 +691,7 @@ static ncclResult_t progressOps(struct ncclProxyState* proxyState, struct ncclPr if (*proxyState->resilientRepairing) { // if in resilient repairing state, we need to clean up all the ops from the proxy + INFO(NCCL_NET|NCCL_PROXY,"detect in resilient repairing mode, will start to remove Ops from the queue for rank: %d", proxyState->tpRank); op->state = ncclProxyOpNone; }else{ NCCLCHECK(op->progress(proxyState, op)); @@ -1577,7 +1579,7 @@ void* ncclResilientDaemon(void* _args) { } while(resilientDaemonRunning) { - if (!comm->resilientRepairing) + if (!*comm->resilientRepairing) { int nicStat = 0; memset(status, 0, comm->nRanks * sizeof(int)); @@ -1595,8 +1597,8 @@ void* ncclResilientDaemon(void* _args) { { INFO(NCCL_INIT, "[Proxy Service] ncclResilientDaemon, detect the nic failure, will abort the kernel execution now"); *comm->resilientRepairing = true; + resilientRepairing = true; // *comm->abortFlag = 1; - continue; } } sleep(interval); diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index 23a6c52..e0b9e99 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -98,7 +98,6 @@ static void* ncclIbAsyncThreadMain(void* args) { if (strcmp(str, "local catastrophic error") == 0) { WARN("NET/IB : Detect Nic failure, will repair soon, event type: %d", event.event_type); nicStatus = 1; - break; } } if (ncclSuccess != wrap_ibv_ack_async_event(&event)) { break; }