diff --git a/apps/nccl/src/allreduce.hpp b/apps/nccl/src/allreduce.hpp index 182834848..681a82978 100644 --- a/apps/nccl/src/allreduce.hpp +++ b/apps/nccl/src/allreduce.hpp @@ -10,8 +10,8 @@ #include #include #include -#include #include +#include #include #if defined(ENABLE_NPKIT) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index 812d95cd2..24ab5dc6f 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include #include @@ -319,7 +319,7 @@ static std::vector setupNvlsChannels( for (size_t idx = 0; idx < NUM_NVLS_CONNECTION; ++idx) { std::shared_ptr nvlsConnection = conns[idx]; - mscclpp::SwitchChannel SwitchChannel = nvlsConnection->bindAllocatedMemory((CUdeviceptr)buffer, bufferSize); + mscclpp::SwitchChannel SwitchChannel(nvlsConnection, buffer, bufferSize); channels.push_back(SwitchChannel); } return channels; diff --git a/docs/cpp_api.rst b/docs/cpp_api.rst index fbfd503d1..6edb3a46a 100644 --- a/docs/cpp_api.rst +++ b/docs/cpp_api.rst @@ -231,8 +231,6 @@ Channel Device Interfaces .. doxygenstruct:: mscclpp::BasePortChannelDeviceHandle :members: -.. doxygenunion:: mscclpp::ChannelTrigger - .. doxygenunion:: mscclpp::LL16Packet .. doxygenunion:: mscclpp::LL8Packet diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 6679b91c4..a61aaf5e7 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -378,41 +378,70 @@ struct Device { int id; }; -/// Used to configure an endpoint. +/// Configuration for creating communication endpoints. struct EndpointConfig { - static const int DefaultMaxCqSize = 1024; - static const int DefaultMaxCqPollNum = 1; - static const int DefaultMaxSendWr = 8192; - static const int DefaultMaxWrPerSend = 64; - + /// InfiniBand-specific configuration options that control queue pair behavior and performance characteristics. + /// These settings are only used when the transport is an InfiniBand type (IB0-IB7); they are ignored for other + /// transports. + struct Ib { + static const int DefaultMaxCqSize = 1024; + static const int DefaultMaxCqPollNum = 1; + static const int DefaultMaxSendWr = 8192; + static const int DefaultMaxWrPerSend = 64; + + /// Maximum size of the completion queue. + int maxCqSize; + /// Maximum number of completion queue polls per operation. + int maxCqPollNum; + /// Maximum number of outstanding send work requests. + int maxSendWr; + /// Maximum number of work requests per send operation. + int maxWrPerSend; + + /// Constructor. + /// @param maxCqSize Maximum completion queue size. + /// @param maxCqPollNum Maximum completion queue poll count. + /// @param maxSendWr Maximum outstanding send work requests. + /// @param maxWrPerSend Maximum work requests per send operation. + Ib(int maxCqSize = DefaultMaxCqSize, int maxCqPollNum = DefaultMaxCqPollNum, int maxSendWr = DefaultMaxSendWr, + int maxWrPerSend = DefaultMaxWrPerSend) + : maxCqSize(maxCqSize), maxCqPollNum(maxCqPollNum), maxSendWr(maxSendWr), maxWrPerSend(maxWrPerSend) {} + }; + + /// NVLS-specific configuration options. + struct Nvls { + int numDevices; + size_t bufferSize; + bool isRoot; + + /// Constructor. + /// @param numDevices Number of devices to enroll. Should be positive when using NVLS transport. + /// @param bufferSize NVLS buffer size. Should be positive when using NVLS transport. + /// @param isRoot Whether this is the root device in the NVLS group. + Nvls(int numDevices = -1, size_t bufferSize = 0, bool isRoot = false) + : numDevices(numDevices), bufferSize(bufferSize), isRoot(isRoot) {} + }; + + /// Communication transport type (e.g., CudaIpc, IB0-IB7, Ethernet). Transport transport; + /// Target device for the endpoint (GPU or CPU with optional device ID). Device device; - int ibMaxCqSize; - int ibMaxCqPollNum; - int ibMaxSendWr; - int ibMaxWrPerSend; + /// Maximum number of write requests that can be queued (-1 for default). int maxWriteQueueSize; - - /// Constructor that takes a transport and sets the other fields to their default values. - /// - /// @param transport The transport to use. - /// @param device The device to use. - /// @param ibMaxCqSize The maximum completion queue size. - /// @param ibMaxCqPollNum The maximum completion queue poll number. - /// @param ibMaxSendWr The maximum send work requests. - /// @param ibMaxWrPerSend The maximum work requests per send. - /// @param maxWriteQueueSize The maximum write queue size. - EndpointConfig(Transport transport = Transport::Unknown, Device device = DeviceType::GPU, - int ibMaxCqSize = DefaultMaxCqSize, int ibMaxCqPollNum = DefaultMaxCqPollNum, - int ibMaxSendWr = DefaultMaxSendWr, int ibMaxWrPerSend = DefaultMaxWrPerSend, - int maxWriteQueueSize = -1) - : transport(transport), - device(device), - ibMaxCqSize(ibMaxCqSize), - ibMaxCqPollNum(ibMaxCqPollNum), - ibMaxSendWr(ibMaxSendWr), - ibMaxWrPerSend(ibMaxWrPerSend), - maxWriteQueueSize(maxWriteQueueSize) {} + /// InfiniBand-specific options (used only for Transport::IBx). + Ib ib; + /// NVLS-specific options (used only for Transport::Nvls). + Nvls nvls; + + /// Constructs endpoint configuration with specified transport, device, and optional settings. + /// @param transport Communication transport to use. + /// @param device Target device for the endpoint. + /// @param maxWriteQueueSize Maximum write queue size (-1 for system default). + /// @param ib IB-specific configuration. + /// @param nvls NVLS-specific configuration. + EndpointConfig(Transport transport = Transport::Unknown, Device device = DeviceType::GPU, int maxWriteQueueSize = -1, + Ib ib = {}, Nvls nvls = {}) + : transport(transport), device(device), maxWriteQueueSize(maxWriteQueueSize), ib(ib), nvls(nvls) {} }; class Context; @@ -426,6 +455,10 @@ class Endpoint { /// Constructor. Endpoint() = default; + /// Get the configuration used. + /// @return The configuration used. + const EndpointConfig& config() const; + /// Get the transport used. /// @return The transport used. Transport transport() const; @@ -687,9 +720,9 @@ class Semaphore { std::shared_ptr pimpl_; }; +/// Deprecated. template -using NonblockingFuture [[deprecated("Use std::shared_future instead. This will be removed in a future release.")]] = - std::shared_future; +using NonblockingFuture = std::shared_future; /// A class that sets up all registered memories and connections between processes. /// @@ -855,12 +888,20 @@ class Communicator { /// on the last future, it will start receiving the five RegisteredMemory or Connection objects in order, /// back to back. /// - /// @param localConfig The configuration for the local endpoint. + /// @param localEndpoint The local endpoint. /// @param remoteRank The rank of the remote process. /// @param tag The tag to use for identifying the send and receive. /// @return A future of shared pointer to the connection. /// - std::shared_future> connect(EndpointConfig localConfig, int remoteRank, int tag = 0); + std::shared_future> connect(const Endpoint& localEndpoint, int remoteRank, int tag = 0); + + /// Connect to a remote rank. Wrapper of `connect(localEndpoint, remoteRank, tag)`. + /// @param localConfig The configuration for the local endpoint. + /// @param remoteRank The rank of the remote process. + /// @param tag The tag to use for identifying the send and receive. + /// @return A future of shared pointer to the connection. + std::shared_future> connect(const EndpointConfig& localConfig, int remoteRank, + int tag = 0); [[deprecated("Use connect(localConfig, remoteRank, tag) instead. This will be removed in a future release.")]] std:: shared_future> @@ -873,6 +914,13 @@ class Communicator { } /// Build a semaphore for cross-process synchronization. + /// @param localStub The SemaphoreStub to be the local end of the semaphore. + /// @param remoteRank The rank of the remote process. + /// @param tag The tag to use for identifying the operation. + /// @return A future of the built semaphore. + std::shared_future buildSemaphore(const SemaphoreStub& localStub, int remoteRank, int tag = 0); + + /// Build a semaphore for cross-process synchronization. Wrapper of `buildSemaphore(localStub, remoteRank, tag)`. /// @param connection The connection associated with this semaphore. /// @param remoteRank The rank of the remote process. /// @param tag The tag to use for identifying the operation. diff --git a/include/mscclpp/gpu.hpp b/include/mscclpp/gpu.hpp index c0c108ccb..e5771c578 100644 --- a/include/mscclpp/gpu.hpp +++ b/include/mscclpp/gpu.hpp @@ -4,9 +4,9 @@ #ifndef MSCCLPP_GPU_HPP_ #define MSCCLPP_GPU_HPP_ -#if defined(__HIP_PLATFORM_AMD__) +#include -#include +#if defined(MSCCLPP_DEVICE_HIP) using cudaError_t = hipError_t; using cudaEvent_t = hipEvent_t; @@ -42,10 +42,14 @@ constexpr auto CU_MEM_ALLOCATION_TYPE_PINNED = hipMemAllocationTypePinned; constexpr auto CU_MEM_LOCATION_TYPE_DEVICE = hipMemLocationTypeDevice; constexpr auto CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR = hipMemHandleTypePosixFileDescriptor; constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWrite; +constexpr auto CU_MEM_ALLOC_GRANULARITY_MINIMUM = hipMemAllocationGranularityMinimum; +constexpr auto CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL = HIP_POINTER_ATTRIBUTE_DEVICE_ORDINAL; #ifndef CUDA_SUCCESS #define CUDA_SUCCESS hipSuccess #endif // CUDA_SUCCESS +#define CUDA_ERROR_NOT_SUPPORTED hipErrorNotSupported +#define CUDA_ERROR_INVALID_VALUE hipErrorInvalidValue #define cudaEventCreate(...) hipEventCreate(__VA_ARGS__) #define cudaEventCreateWithFlags(...) hipEventCreateWithFlags(__VA_ARGS__) @@ -93,6 +97,7 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri #define cuMemAddressReserve(...) hipMemAddressReserve(__VA_ARGS__) #define cuMemAddressFree(...) hipMemAddressFree(__VA_ARGS__) #define cuMemGetAddressRange(...) hipMemGetAddressRange(__VA_ARGS__) +#define cuMemGetAllocationGranularity(...) hipMemGetAllocationGranularity(__VA_ARGS__) #define cuMemCreate(...) hipMemCreate(__VA_ARGS__) #define cuMemRelease(...) hipMemRelease(__VA_ARGS__) #define cuMemSetAccess(...) hipMemSetAccess(__VA_ARGS__) @@ -101,30 +106,31 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri #define cuMemRetainAllocationHandle(...) hipMemRetainAllocationHandle(__VA_ARGS__) #define cuMemExportToShareableHandle(...) hipMemExportToShareableHandle(__VA_ARGS__) #define cuMemImportFromShareableHandle(...) hipMemImportFromShareableHandle(__VA_ARGS__) +#define cuPointerGetAttribute(...) hipPointerGetAttribute(__VA_ARGS__) -#else +#else // !defined(MSCCLPP_DEVICE_HIP) #include #include -#endif +#endif // !defined(MSCCLPP_DEVICE_HIP) // NVLS -#if !defined(__HIP_PLATFORM_AMD__) +#if !defined(MSCCLPP_DEVICE_HIP) #include #if CUDART_VERSION < 12030 #define CU_MEM_HANDLE_TYPE_FABRIC ((CUmemAllocationHandleType)0x8ULL) #endif // We need CUDA 12.3 above and kernel 5.6.0 above for NVLS API #define CUDA_NVLS_API_AVAILABLE ((CUDART_VERSION >= 12030) && (LINUX_VERSION_CODE >= KERNEL_VERSION(5, 6, 0))) -#else // defined(__HIP_PLATFORM_AMD__) +#else // defined(MSCCLPP_DEVICE_HIP) #define CUDA_NVLS_API_AVAILABLE 0 // NVLS is not supported on AMD platform, just to avoid compilation error -#define CU_MEM_HANDLE_TYPE_FABRIC (0x8ULL) -#endif // !defined(__HIP_PLATFORM_AMD__) +#define CU_MEM_HANDLE_TYPE_FABRIC ((hipMemAllocationHandleType)0x8ULL) +#endif // !defined(MSCCLPP_DEVICE_HIP) // GPU sync threads -#if defined(__HIP_PLATFORM_AMD__) +#if defined(MSCCLPP_DEVICE_HIP) #define __syncshm() asm volatile("s_waitcnt lgkmcnt(0) \n s_barrier"); #else #define __syncshm() __syncthreads(); diff --git a/include/mscclpp/gpu_utils.hpp b/include/mscclpp/gpu_utils.hpp index 6ab7fe743..934235083 100644 --- a/include/mscclpp/gpu_utils.hpp +++ b/include/mscclpp/gpu_utils.hpp @@ -134,7 +134,6 @@ void* gpuCallocHost(size_t bytes, unsigned int flags); void* gpuCallocUncached(size_t bytes); #endif // defined(__HIP_PLATFORM_AMD__) #if (CUDA_NVLS_API_AVAILABLE) -extern CUmemAllocationHandleType nvlsCompatibleMemHandleType; void* gpuCallocPhysical(size_t bytes, size_t gran = 0, size_t align = 0); #endif // CUDA_NVLS_API_AVAILABLE diff --git a/include/mscclpp/memory_channel.hpp b/include/mscclpp/memory_channel.hpp index 33fdb3e01..8aa486e23 100644 --- a/include/mscclpp/memory_channel.hpp +++ b/include/mscclpp/memory_channel.hpp @@ -4,12 +4,11 @@ #ifndef MSCCLPP_MEMORY_CHANNEL_HPP_ #define MSCCLPP_MEMORY_CHANNEL_HPP_ +#include +#include +#include #include -#include "core.hpp" -#include "memory_channel_device.hpp" -#include "semaphore.hpp" - namespace mscclpp { /// Memory channel without specifying source/destination memory regions. diff --git a/include/mscclpp/nvls.hpp b/include/mscclpp/nvls.hpp deleted file mode 100644 index 1740e0335..000000000 --- a/include/mscclpp/nvls.hpp +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -#ifndef MSCCLPP_NVLS_HPP_ -#define MSCCLPP_NVLS_HPP_ - -#include -#include - -namespace mscclpp { - -class NvlsConnection; - -struct SwitchChannel { - private: - void* devicePtr_; - std::shared_ptr mcPtr_; - size_t bufferSize_; - - public: - using DeviceHandle = SwitchChannelDeviceHandle; - SwitchChannel(void* devicePtr, std::shared_ptr mcPtr, size_t bufferSize) - : devicePtr_(devicePtr), mcPtr_(mcPtr), bufferSize_(bufferSize) {} - DeviceHandle deviceHandle() const; - void* getDevicePtr(); - - friend class NvlsConnection; -}; - -class NvlsConnection { - public: - NvlsConnection(size_t bufferSize, int numDevices); - NvlsConnection(const std::vector& data); - NvlsConnection() = delete; - std::vector serialize(); - - // Everyone needs to synchronize after creating a NVLS connection before adding devices - void addDevice(); - void addDevice(int cudaDeviceId); - - /// Bind the memory allocated via mscclpp::GpuBuffer to the multicast handle. The behavior - /// is undefined if the devicePtr is not allocated by mscclpp::GpuBuffer. - /// @param devicePtr The device pointer returned by `mscclpp::GpuBuffer::data()`. - /// @param size The bytes of the memory to bind to the multicast handle. - /// @return SwitchChannel with devicePtr, mcPtr and bufferSize - SwitchChannel bindAllocatedMemory(CUdeviceptr devicePtr, size_t size); - - size_t getMultiCastMinGranularity(); - - private: - class Impl; - std::shared_ptr pimpl_; -}; - -class Communicator; - -/// Connect to NVLS on setup. -/// -/// This function used to connect to NVLS on setup. NVLS collective using multicast operations to send/recv data. -/// Here we need to put all involved ranks into the collective group. -/// -/// @param comm The communicator. -/// @param allRanks The ranks of all processes involved in the collective. -/// @param config The configuration for the local endpoint. -/// @return std::shared_ptr A shared pointer to the NVLS connection. -std::shared_ptr connectNvlsCollective(std::shared_ptr comm, std::vector allRanks, - size_t bufferSize); - -} // namespace mscclpp - -#endif // MSCCLPP_NVLS_HPP_ diff --git a/include/mscclpp/switch_channel.hpp b/include/mscclpp/switch_channel.hpp new file mode 100644 index 000000000..79cbf4a8b --- /dev/null +++ b/include/mscclpp/switch_channel.hpp @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef MSCCLPP_SWITCH_CHANNEL_HPP_ +#define MSCCLPP_SWITCH_CHANNEL_HPP_ + +#include +#include + +namespace mscclpp { + +struct NvlsConnection { + std::vector> rootPeerConnections; + std::shared_ptr rootSelfConnection; + std::shared_ptr connection; +}; + +class SwitchChannel { + private: + void* devicePtr_; + void* mcPtr_; + size_t bufferSize_; + + public: + using DeviceHandle = SwitchChannelDeviceHandle; + SwitchChannel(std::shared_ptr conn, void* data, size_t bytes); + DeviceHandle deviceHandle() const; + void* getDevicePtr(); +}; + +class Communicator; + +/// Connect to NVLS on setup. +/// +/// This function used to connect to NVLS on setup. NVLS collective using multicast operations to send/recv data. +/// Here we need to put all involved ranks into the collective group. +/// +/// @param comm The communicator. +/// @param allRanks The ranks of all processes involved in the collective. +/// @param config The configuration for the local endpoint. +/// @return std::shared_ptr A shared pointer to the NVLS connection. +std::shared_ptr connectNvlsCollective(std::shared_ptr comm, std::vector allRanks, + size_t bufferSize); + +} // namespace mscclpp + +#endif // MSCCLPP_SWITCH_CHANNEL_HPP_ diff --git a/python/mscclpp/__init__.py b/python/mscclpp/__init__.py index 229a29028..7b3d78e05 100644 --- a/python/mscclpp/__init__.py +++ b/python/mscclpp/__init__.py @@ -30,6 +30,7 @@ RegisteredMemory, PortChannel, MemoryChannel, + SwitchChannel, MemoryDevice2DeviceSemaphore, TcpBootstrap, Transport, @@ -61,6 +62,7 @@ "RegisteredMemory", "PortChannel", "MemoryChannel", + "SwitchChannel", "MemoryDevice2DeviceSemaphore", "TcpBootstrap", "Transport", diff --git a/python/mscclpp/core_py.cpp b/python/mscclpp/core_py.cpp index 9fb803683..ae326d3d7 100644 --- a/python/mscclpp/core_py.cpp +++ b/python/mscclpp/core_py.cpp @@ -124,6 +124,17 @@ void register_core(nb::module_& m) { .def_rw("id", &Device::id) .def("__str__", [](const Device& self) { return std::to_string(self); }); + nb::class_(m, "EndpointConfigIb") + .def(nb::init<>()) + .def(nb::init(), nb::arg("maxCqSize") = EndpointConfig::Ib::DefaultMaxCqSize, + nb::arg("maxCqPollNum") = EndpointConfig::Ib::DefaultMaxCqPollNum, + nb::arg("maxSendWr") = EndpointConfig::Ib::DefaultMaxSendWr, + nb::arg("maxWrPerSend") = EndpointConfig::Ib::DefaultMaxWrPerSend) + .def_rw("max_cq_size", &EndpointConfig::Ib::maxCqSize) + .def_rw("max_cq_poll_num", &EndpointConfig::Ib::maxCqPollNum) + .def_rw("max_send_wr", &EndpointConfig::Ib::maxSendWr) + .def_rw("max_wr_per_send", &EndpointConfig::Ib::maxWrPerSend); + nb::class_(m, "RegisteredMemory") .def(nb::init<>()) .def("data", [](RegisteredMemory& self) { return reinterpret_cast(self.data()); }) @@ -158,17 +169,23 @@ void register_core(nb::module_& m) { nb::class_(m, "EndpointConfig") .def(nb::init<>()) .def(nb::init_implicit(), nb::arg("transport")) - .def(nb::init(), nb::arg("transport"), nb::arg("device"), - nb::arg("ibMaxCqSize") = EndpointConfig::DefaultMaxCqSize, - nb::arg("ibMaxCqPollNum") = EndpointConfig::DefaultMaxCqPollNum, - nb::arg("ibMaxSendWr") = EndpointConfig::DefaultMaxSendWr, - nb::arg("ibMaxWrPerSend") = EndpointConfig::DefaultMaxWrPerSend, nb::arg("maxWriteQueueSize") = -1) + .def(nb::init(), nb::arg("transport"), nb::arg("device"), + nb::arg("maxWriteQueueSize") = -1, nb::arg("ib") = EndpointConfig::Ib{}) .def_rw("transport", &EndpointConfig::transport) .def_rw("device", &EndpointConfig::device) - .def_rw("ib_max_cq_size", &EndpointConfig::ibMaxCqSize) - .def_rw("ib_max_cq_poll_num", &EndpointConfig::ibMaxCqPollNum) - .def_rw("ib_max_send_wr", &EndpointConfig::ibMaxSendWr) - .def_rw("ib_max_wr_per_send", &EndpointConfig::ibMaxWrPerSend) + .def_rw("ib", &EndpointConfig::ib) + .def_prop_rw( + "ib_max_cq_size", [](EndpointConfig& self) { return self.ib.maxCqSize; }, + [](EndpointConfig& self, int v) { self.ib.maxCqSize = v; }) + .def_prop_rw( + "ib_max_cq_poll_num", [](EndpointConfig& self) { return self.ib.maxCqPollNum; }, + [](EndpointConfig& self, int v) { self.ib.maxCqPollNum = v; }) + .def_prop_rw( + "ib_max_send_wr", [](EndpointConfig& self) { return self.ib.maxSendWr; }, + [](EndpointConfig& self, int v) { self.ib.maxSendWr = v; }) + .def_prop_rw( + "ib_max_wr_per_send", [](EndpointConfig& self) { return self.ib.maxWrPerSend; }, + [](EndpointConfig& self, int v) { self.ib.maxWrPerSend = v; }) .def_rw("max_write_queue_size", &EndpointConfig::maxWriteQueueSize); nb::class_(m, "Context") @@ -212,13 +229,15 @@ void register_core(nb::module_& m) { .def("send_memory", &Communicator::sendMemory, nb::arg("memory"), nb::arg("remoteRank"), nb::arg("tag") = 0) .def("recv_memory", &Communicator::recvMemory, nb::arg("remoteRank"), nb::arg("tag") = 0) .def("connect", - static_cast> (Communicator::*)(EndpointConfig, int, int)>( + static_cast> (Communicator::*)(const Endpoint&, int, int)>( &Communicator::connect), - nb::arg("localConfig"), nb::arg("remoteRank"), nb::arg("tag") = 0) + nb::arg("localEndpoint"), nb::arg("remoteRank"), nb::arg("tag") = 0) + .def("connect", [](Communicator* self, const EndpointConfig& localConfig, int remoteRank, + int tag = 0) { return self->connect(localConfig, remoteRank, tag); }) .def( "connect", - [](Communicator* self, int remoteRank, int tag, EndpointConfig localConfig) { - return self->connect(std::move(localConfig), remoteRank, tag); + [](Communicator* self, int remoteRank, int tag, const EndpointConfig& localConfig) { + return self->connect(localConfig, remoteRank, tag); }, nb::arg("remoteRank"), nb::arg("tag"), nb::arg("localConfig")) .def( @@ -229,8 +248,12 @@ void register_core(nb::module_& m) { nb::arg("remoteRank"), nb::arg("tag"), nb::arg("localConfig")) .def("send_memory_on_setup", &Communicator::sendMemory, nb::arg("memory"), nb::arg("remoteRank"), nb::arg("tag")) .def("recv_memory_on_setup", &Communicator::recvMemory, nb::arg("remoteRank"), nb::arg("tag")) - .def("build_semaphore", &Communicator::buildSemaphore, nb::arg("localFlag"), nb::arg("remoteRank"), - nb::arg("tag") = 0) + .def("build_semaphore", + static_cast (Communicator::*)(const SemaphoreStub&, int, int)>( + &Communicator::buildSemaphore), + nb::arg("localStub"), nb::arg("remoteRank"), nb::arg("tag") = 0) + .def("build_semaphore", [](Communicator* self, std::shared_ptr conn, int remoteRank, + int tag = 0) { return self->buildSemaphore(conn, remoteRank, tag); }) .def("remote_rank_of", &Communicator::remoteRankOf) .def("tag_of", &Communicator::tagOf) .def("setup", [](Communicator*) {}); diff --git a/python/mscclpp/nvls_py.cpp b/python/mscclpp/nvls_py.cpp index 78cedefbc..42a1bb503 100644 --- a/python/mscclpp/nvls_py.cpp +++ b/python/mscclpp/nvls_py.cpp @@ -9,13 +9,18 @@ #include #include -#include +#include namespace nb = nanobind; using namespace mscclpp; void register_nvls(nb::module_& m) { nb::class_(m, "SwitchChannel") + .def("__init__", + [](SwitchChannel* switchChannel, std::shared_ptr nvlsConnection, uintptr_t buffer, + size_t bufferSize) { + new (switchChannel) SwitchChannel(nvlsConnection, reinterpret_cast(buffer), bufferSize); + }) .def("get_device_ptr", [](SwitchChannel* self) { return (uintptr_t)self->getDevicePtr(); }) .def("device_handle", &SwitchChannel::deviceHandle); @@ -28,10 +33,6 @@ void register_nvls(nb::module_& m) { return nb::bytes(reinterpret_cast(&self), sizeof(self)); }); - nb::class_(m, "NvlsConnection") - .def("bind_allocated_memory", &NvlsConnection::bindAllocatedMemory, nb::arg("devicePtr"), nb::arg("size")) - .def("get_multicast_min_granularity", &NvlsConnection::getMultiCastMinGranularity); - m.def("connect_nvls_collective", &connectNvlsCollective, nb::arg("communicator"), nb::arg("allRanks"), nb::arg("bufferSize")); } diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index 629cb2c5d..261feb3e4 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -30,6 +30,7 @@ env, Device, DeviceType, + SwitchChannel, ) import mscclpp.comm as mscclpp_comm from mscclpp.utils import KernelBuilder, GpuBuffer, pack @@ -343,13 +344,13 @@ def test_nvls_connection(mpi_group: MpiGroup): memory1 = GpuBuffer(2**29, cp.int8) memory2 = GpuBuffer(2**29, cp.int8) memory3 = GpuBuffer(2**29, cp.int8) - mem_handle1 = nvls_connection.bind_allocated_memory(memory1.data.ptr, memory1.data.mem.size) - mem_handle2 = nvls_connection.bind_allocated_memory(memory2.data.ptr, memory2.data.mem.size) + chan1 = SwitchChannel(nvls_connection, memory1.data.ptr, memory1.data.mem.size) + chan2 = SwitchChannel(nvls_connection, memory2.data.ptr, memory2.data.mem.size) with pytest.raises(Exception): - mem_handle3 = nvls_connection.bind_allocated_memory(memory3.data.ptr, memory3.data.mem.size) - # the memory is freed on the destructor of mem_handle2 - mem_handle2 = None - mem_handle3 = nvls_connection.bind_allocated_memory(memory3.data.ptr, memory3.data.mem.size) + chan3 = SwitchChannel(nvls_connection, memory3.data.ptr, memory3.data.mem.size) + # the memory is freed on the destructor of chan2 + chan2 = None + chan3 = SwitchChannel(nvls_connection, memory3.data.ptr, memory3.data.mem.size) class MscclppKernel: diff --git a/src/communicator.cc b/src/communicator.cc index 305087a55..58f0f1045 100644 --- a/src/communicator.cc +++ b/src/communicator.cc @@ -99,25 +99,23 @@ MSCCLPP_API_CPP std::shared_future Communicator::recvMemory(in return shared_future; } -MSCCLPP_API_CPP std::shared_future> Communicator::connect(EndpointConfig localConfig, +MSCCLPP_API_CPP std::shared_future> Communicator::connect(const Endpoint& localEndpoint, int remoteRank, int tag) { - auto localEndpoint = context()->createEndpoint(localConfig); - if (remoteRank == bootstrap()->getRank()) { // Connection to self - auto remoteEndpoint = context()->createEndpoint(localConfig); + auto remoteEndpoint = context()->createEndpoint(localEndpoint.config()); auto connection = context()->connect(localEndpoint, remoteEndpoint); std::promise> promise; promise.set_value(connection); pimpl_->connectionInfos_[connection.get()] = {remoteRank, tag}; - return std::shared_future>(std::move(promise.get_future())); + return std::shared_future>(promise.get_future()); } bootstrap()->send(localEndpoint.serialize(), remoteRank, tag); auto future = std::async(std::launch::deferred, [this, remoteRank, tag, lastRecvItem = pimpl_->getLastRecvItem(remoteRank, tag), - localEndpoint = std::move(localEndpoint)]() mutable { + localEndpoint = localEndpoint]() mutable { if (lastRecvItem) { // Recursive call to the previous receive items lastRecvItem->wait(); @@ -134,14 +132,19 @@ MSCCLPP_API_CPP std::shared_future> Communicator::co return shared_future; } +MSCCLPP_API_CPP std::shared_future> Communicator::connect(const EndpointConfig& localConfig, + int remoteRank, int tag) { + auto localEndpoint = context()->createEndpoint(localConfig); + return connect(localEndpoint, remoteRank, tag); +} + MSCCLPP_API_CPP std::shared_future> Communicator::connect(int remoteRank, int tag, EndpointConfig localConfig) { return connect(localConfig, remoteRank, tag); } -MSCCLPP_API_CPP std::shared_future Communicator::buildSemaphore(std::shared_ptr connection, +MSCCLPP_API_CPP std::shared_future Communicator::buildSemaphore(const SemaphoreStub& localStub, int remoteRank, int tag) { - SemaphoreStub localStub(connection); bootstrap()->send(localStub.serialize(), remoteRank, tag); auto future = @@ -161,6 +164,11 @@ MSCCLPP_API_CPP std::shared_future Communicator::buildSemaphore(std:: return shared_future; } +MSCCLPP_API_CPP std::shared_future Communicator::buildSemaphore(std::shared_ptr connection, + int remoteRank, int tag) { + return buildSemaphore(SemaphoreStub(connection), remoteRank, tag); +} + MSCCLPP_API_CPP int Communicator::remoteRankOf(const Connection& connection) { return pimpl_->connectionInfos_.at(&connection).remoteRank; } diff --git a/src/connection.cc b/src/connection.cc index b3165a863..d8039ca95 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -94,6 +94,31 @@ CudaIpcConnection::CudaIpcConnection(std::shared_ptr context, const End } #endif // !defined(MSCCLPP_DEVICE_HIP) stream_ = ctxImpl.ipcStreams_.back(); + + const auto& localCfg = localEndpoint.config(); + const auto& remoteCfg = remoteEndpoint.config(); + if (localCfg.nvls.numDevices > 0 && localCfg.nvls.bufferSize > 0 && remoteCfg.nvls.numDevices > 0 && + remoteCfg.nvls.bufferSize > 0) { + if (!localCfg.nvls.isRoot && !remoteCfg.nvls.isRoot) { + throw Error("Failed to enable NVLS: one endpoint should be the root, but both endpoints are non-root", + ErrorCode::InvalidUsage); + } else if (localCfg.nvls.isRoot && remoteCfg.nvls.isRoot) { + throw Error("Failed to enable NVLS: only one endpoint should be a root, but both endpoints are root", + ErrorCode::InvalidUsage); + } + if (localCfg.nvls.numDevices != remoteCfg.nvls.numDevices || + localCfg.nvls.bufferSize != remoteCfg.nvls.bufferSize) { + throw Error("Failed to enable NVLS: endpoint configurations do not match", ErrorCode::InvalidUsage); + } + if (remoteCfg.nvls.isRoot) { + auto& rootNvlsHandle = getImpl(remoteEndpoint).nvlsHandle_; + if (!rootNvlsHandle) { + throw Error("Root's NVLS handle is empty", ErrorCode::InternalError); + } + nvlsMem_ = std::make_shared(*rootNvlsHandle); + nvlsNumDevs_ = remoteCfg.nvls.numDevices; + } + } } Transport CudaIpcConnection::transport() const { return Transport::CudaIpc; } @@ -167,9 +192,6 @@ IBConnection::IBConnection(std::shared_ptr context, const Endpoint& loc transport_(localEndpoint.transport()), remoteTransport_(remoteEndpoint.transport()), dummyAtomicSource_(std::make_unique(0)) { - if (maxWriteQueueSize_ == -1) { - maxWriteQueueSize_ = EndpointConfig::DefaultMaxCqSize; - } qp_ = getImpl(localEndpoint).ibQp_; qp_.lock()->rtr(getImpl(remoteEndpoint).ibQpInfo_); qp_.lock()->rts(); diff --git a/src/context.cc b/src/context.cc index b58987b20..610ad27e1 100644 --- a/src/context.cc +++ b/src/context.cc @@ -4,6 +4,7 @@ #include "context.hpp" #include +#include #include "api.h" #include "connection.hpp" @@ -76,21 +77,21 @@ MSCCLPP_API_CPP std::shared_ptr Context::connect(const Endpoint &loc if (remoteEndpoint.device().type == DeviceType::GPU && remoteEndpoint.device().id < 0) { throw Error("No GPU device ID provided for remote endpoint", ErrorCode::InvalidUsage); } + auto localTransport = localEndpoint.transport(); + auto remoteTransport = remoteEndpoint.transport(); + if (localTransport != remoteTransport && + !(AllIBTransports.has(localTransport) && AllIBTransports.has(remoteTransport))) { + std::stringstream ss; + ss << "Transport mismatch between local (" << std::to_string(localTransport) << ") and remote (" + << std::to_string(remoteEndpoint.transport()) << ") endpoints"; + throw Error(ss.str(), ErrorCode::InvalidUsage); + } std::shared_ptr conn; - if (localEndpoint.transport() == Transport::CudaIpc) { - if (remoteEndpoint.transport() != Transport::CudaIpc) { - throw Error("Local transport is CudaIpc but remote is not", ErrorCode::InvalidUsage); - } + if (localTransport == Transport::CudaIpc) { conn = std::make_shared(shared_from_this(), localEndpoint, remoteEndpoint); - } else if (AllIBTransports.has(localEndpoint.transport())) { - if (!AllIBTransports.has(remoteEndpoint.transport())) { - throw Error("Local transport is IB but remote is not", ErrorCode::InvalidUsage); - } + } else if (AllIBTransports.has(localTransport)) { conn = std::make_shared(shared_from_this(), localEndpoint, remoteEndpoint); - } else if (localEndpoint.transport() == Transport::Ethernet) { - if (remoteEndpoint.transport() != Transport::Ethernet) { - throw Error("Local transport is Ethernet but remote is not", ErrorCode::InvalidUsage); - } + } else if (localTransport == Transport::Ethernet) { conn = std::make_shared(shared_from_this(), localEndpoint, remoteEndpoint); } else { throw Error("Unsupported transport", ErrorCode::InternalError); diff --git a/src/core.cc b/src/core.cc index e60058158..bb3dca383 100644 --- a/src/core.cc +++ b/src/core.cc @@ -99,9 +99,14 @@ std::string to_string(const mscclpp::Transport& transport) { return TransportNames[static_cast(transport)]; } +std::string to_string(const mscclpp::DeviceType& deviceType) { + static const std::string DeviceTypeNames[] = {"Unknown", "CPU", "GPU"}; + return DeviceTypeNames[static_cast(deviceType)]; +} + std::string to_string(const mscclpp::Device& device) { std::stringstream ss; - ss << "Device(type=" << to_string(device.type) << ", id=" << device.id << ")"; + ss << "Device(type=" << std::to_string(device.type) << ", id=" << device.id << ")"; return ss.str(); } diff --git a/src/endpoint.cc b/src/endpoint.cc index c8c41b20f..0934a6690 100644 --- a/src/endpoint.cc +++ b/src/endpoint.cc @@ -13,21 +13,25 @@ namespace mscclpp { -Endpoint::Impl::Impl(EndpointConfig config, Context::Impl& contextImpl) - : transport_(config.transport), - device_(config.device), - hostHash_(getHostHash()), - pidHash_(getPidHash()), - maxWriteQueueSize_(config.maxWriteQueueSize) { - if (device_.type == DeviceType::GPU && device_.id < 0) { - MSCCLPP_CUDATHROW(cudaGetDevice(&(device_.id))); +Endpoint::Impl::Impl(const EndpointConfig& config, Context::Impl& contextImpl) + : config_(config), hostHash_(getHostHash()), pidHash_(getPidHash()) { + if (config_.device.type == DeviceType::GPU && config_.device.id < 0) { + MSCCLPP_CUDATHROW(cudaGetDevice(&(config_.device.id))); } - if (AllIBTransports.has(transport_)) { + if (config_.transport == Transport::CudaIpc) { + if (config_.nvls.isRoot) { + nvlsHandle_ = GpuIpcMemHandle::createMulticast(config_.nvls.bufferSize, config_.nvls.numDevices); + } + } else if (AllIBTransports.has(config_.transport)) { ibLocal_ = true; - ibQp_ = contextImpl.getIbContext(transport_) - ->createQp(config.ibMaxCqSize, config.ibMaxCqPollNum, config.ibMaxSendWr, 0, config.ibMaxWrPerSend); + if (config_.maxWriteQueueSize <= 0) { + config_.maxWriteQueueSize = config_.ib.maxCqSize; + } + ibQp_ = + contextImpl.getIbContext(config_.transport) + ->createQp(config_.ib.maxCqSize, config_.ib.maxCqPollNum, config_.ib.maxSendWr, 0, config_.ib.maxWrPerSend); ibQpInfo_ = ibQp_->getInfo(); - } else if (transport_ == Transport::Ethernet) { + } else if (config_.transport == Transport::Ethernet) { // Configuring Ethernet Interfaces abortFlag_ = 0; int ret = FindInterfaces(netIfName_, &socketAddress_, MAX_IF_NAME_SIZE, 1); @@ -42,41 +46,54 @@ Endpoint::Impl::Impl(EndpointConfig config, Context::Impl& contextImpl) Endpoint::Impl::Impl(const std::vector& serialization) { auto it = serialization.begin(); - it = detail::deserialize(it, transport_); - it = detail::deserialize(it, device_); + it = detail::deserialize(it, config_); it = detail::deserialize(it, hostHash_); it = detail::deserialize(it, pidHash_); - if (AllIBTransports.has(transport_)) { + if (config_.transport == Transport::CudaIpc) { + if (config_.nvls.isRoot) { + nvlsHandle_.reset(new GpuIpcMemHandle()); + it = detail::deserialize(it, *nvlsHandle_); + } + } else if (AllIBTransports.has(config_.transport)) { ibLocal_ = false; it = detail::deserialize(it, ibQpInfo_); - } - if (transport_ == Transport::Ethernet) { + } else if (config_.transport == Transport::Ethernet) { it = detail::deserialize(it, socketAddress_); } + if (it != serialization.end()) { + throw Error("Endpoint deserialization failed", ErrorCode::Aborted); + } } MSCCLPP_API_CPP Endpoint::Endpoint(std::shared_ptr pimpl) : pimpl_(pimpl) {} -MSCCLPP_API_CPP Transport Endpoint::transport() const { return pimpl_->transport_; } +MSCCLPP_API_CPP const EndpointConfig& Endpoint::config() const { return pimpl_->config_; } -MSCCLPP_API_CPP const Device& Endpoint::device() const { return pimpl_->device_; } +MSCCLPP_API_CPP Transport Endpoint::transport() const { return pimpl_->config_.transport; } + +MSCCLPP_API_CPP const Device& Endpoint::device() const { return pimpl_->config_.device; } MSCCLPP_API_CPP uint64_t Endpoint::hostHash() const { return pimpl_->hostHash_; } MSCCLPP_API_CPP uint64_t Endpoint::pidHash() const { return pimpl_->pidHash_; } -MSCCLPP_API_CPP int Endpoint::maxWriteQueueSize() const { return pimpl_->maxWriteQueueSize_; } +MSCCLPP_API_CPP int Endpoint::maxWriteQueueSize() const { return pimpl_->config_.maxWriteQueueSize; } MSCCLPP_API_CPP std::vector Endpoint::serialize() const { std::vector data; - detail::serialize(data, pimpl_->transport_); - detail::serialize(data, pimpl_->device_); + detail::serialize(data, pimpl_->config_); detail::serialize(data, pimpl_->hostHash_); detail::serialize(data, pimpl_->pidHash_); - if (AllIBTransports.has(pimpl_->transport_)) { + if (pimpl_->config_.transport == Transport::CudaIpc) { + if (pimpl_->config_.nvls.isRoot) { + if (!(pimpl_->nvlsHandle_)) { + throw Error("NVLS handle is not initialized", ErrorCode::InternalError); + } + detail::serialize(data, *(pimpl_->nvlsHandle_)); + } + } else if (AllIBTransports.has(pimpl_->config_.transport)) { detail::serialize(data, pimpl_->ibQpInfo_); - } - if ((pimpl_->transport_) == Transport::Ethernet) { + } else if (pimpl_->config_.transport == Transport::Ethernet) { detail::serialize(data, pimpl_->socketAddress_); } return data; diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 809af43f3..eac491cec 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -3,8 +3,8 @@ #include #include -#include #include +#include #include #include "debug.h" @@ -364,8 +364,7 @@ struct Executor::Impl { NvlsInfo info = nvlsInfos[i]; auto bufferInfo = getBufferInfo(info.bufferType, sendbuff, recvbuff, context.scratchBuffer.get(), sendBuffSize, recvBuffSize, scratchBuffSize); - SwitchChannel switchChannel = - nvlsConnection->bindAllocatedMemory((CUdeviceptr)bufferInfo.first, bufferInfo.second); + SwitchChannel switchChannel(nvlsConnection, bufferInfo.first, bufferInfo.second); context.nvlsChannels.push_back(switchChannel); } } diff --git a/src/gpu_ipc_mem.cc b/src/gpu_ipc_mem.cc new file mode 100644 index 000000000..d775f334d --- /dev/null +++ b/src/gpu_ipc_mem.cc @@ -0,0 +1,422 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include "gpu_ipc_mem.hpp" + +#include +#include +#include + +#include +#include + +#include "debug.h" + +namespace std { + +std::string to_string(const mscclpp::GpuIpcMemHandle::TypeFlags& typeFlags) { + std::stringstream ss; + if (typeFlags & mscclpp::GpuIpcMemHandle::Type::RuntimeIpc) { + ss << "RuntimeIpc|"; + } + if (typeFlags & mscclpp::GpuIpcMemHandle::Type::PosixFd) { + ss << "PosixFd|"; + } + if (typeFlags & mscclpp::GpuIpcMemHandle::Type::Fabric) { + ss << "Fabric|"; + } + std::string ret = ss.str(); + if (!ret.empty()) { + ret.pop_back(); // Remove the trailing '|' + return ret; + } + return "None"; +} + +} // namespace std + +namespace mscclpp { + +static int dupFdFromPid([[maybe_unused]] pid_t pid, [[maybe_unused]] int targetFd) { +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(5, 6, 0)) + // Linux pidfd based cross-process fd duplication + int pidfd = syscall(SYS_pidfd_open, pid, 0); + if (pidfd < 0) { + return -1; + } + int dupfd = syscall(SYS_pidfd_getfd, pidfd, targetFd, 0); + if (dupfd < 0) { + close(pidfd); + return -1; + } + close(pidfd); + return dupfd; +#else + return -1; +#endif +} + +[[maybe_unused]] static bool isFabricMemHandleAvailable() { +#if (CUDA_NVLS_API_AVAILABLE) + CUdevice currentDevice; + int isFabricSupported; + MSCCLPP_CUTHROW(cuCtxGetDevice(¤tDevice)); + MSCCLPP_CUTHROW( + cuDeviceGetAttribute(&isFabricSupported, CU_DEVICE_ATTRIBUTE_HANDLE_TYPE_FABRIC_SUPPORTED, currentDevice)); + if (isFabricSupported == 0) { + return false; + } + + CUmemAllocationProp prop = {}; + prop.type = CU_MEM_ALLOCATION_TYPE_PINNED; + prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE; + prop.location.id = currentDevice; + prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_FABRIC; + + size_t minGran; + MSCCLPP_CUTHROW(cuMemGetAllocationGranularity(&minGran, &prop, CU_MEM_ALLOC_GRANULARITY_MINIMUM)); + + // try allocating minimal amount of memory + CUmemGenericAllocationHandle memHandle; + CUresult result = cuMemCreate(&memHandle, minGran, &prop, 0); + if (result == CUDA_ERROR_NOT_PERMITTED || result == CUDA_ERROR_NOT_SUPPORTED) { + // unprivileged user or old kernel version + return false; + } else { + MSCCLPP_CUTHROW(result); + } + + // it worked; cleanup now + MSCCLPP_CUTHROW(cuMemRelease(memHandle)); + return true; +#else // !(CUDA_NVLS_API_AVAILABLE) + return false; +#endif // !(CUDA_NVLS_API_AVAILABLE) +} + +void GpuIpcMemHandle::deleter(GpuIpcMemHandle* handle) { + if (handle) { + if (handle->typeFlags & GpuIpcMemHandle::Type::PosixFd) { + ::close(handle->posixFd.fd); + } + } +} + +UniqueGpuIpcMemHandle GpuIpcMemHandle::create(const CUdeviceptr ptr) { + auto handle = UniqueGpuIpcMemHandle(new GpuIpcMemHandle(), &GpuIpcMemHandle::deleter); + handle->typeFlags = GpuIpcMemHandle::Type::None; + + CUdeviceptr basePtr; + size_t sz; + MSCCLPP_CUTHROW(cuMemGetAddressRange(&basePtr, &sz, ptr)); + if (sz == 0) return handle; // No valid memory range found + handle->baseSize = sz; + handle->offsetFromBase = size_t(ptr) - size_t(basePtr); + + // Runtime IPC handle + cudaError_t err = cudaIpcGetMemHandle(&handle->runtimeIpc.handle, (void*)basePtr); + if (err == cudaSuccess) { + handle->typeFlags |= GpuIpcMemHandle::Type::RuntimeIpc; + } else { + (void)cudaGetLastError(); + } + +#if !defined(MSCCLPP_DEVICE_HIP) // Remove when HIP fully supports virtual memory management APIs + CUmemGenericAllocationHandle allocHandle; + CUresult res = cuMemRetainAllocationHandle(&allocHandle, (void*)basePtr); + if (res == CUDA_ERROR_NOT_SUPPORTED || res == CUDA_ERROR_INVALID_VALUE) { + // Not supported on this platform or not mapped by cuMem API + return handle; + } + MSCCLPP_CUTHROW(res); + + // POSIX FD handle + if (cuMemExportToShareableHandle(&(handle->posixFd.fd), allocHandle, CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0) == + CUDA_SUCCESS) { + handle->posixFd.pid = getpid(); + handle->typeFlags |= GpuIpcMemHandle::Type::PosixFd; + } + + // FABRIC handle + if (cuMemExportToShareableHandle(&(handle->fabric.handle), allocHandle, CU_MEM_HANDLE_TYPE_FABRIC, 0) == + CUDA_SUCCESS) { + handle->typeFlags |= GpuIpcMemHandle::Type::Fabric; + } + + MSCCLPP_CUTHROW(cuMemRelease(allocHandle)); +#endif // !defined(MSCCLPP_DEVICE_HIP) + + return handle; +} + +UniqueGpuIpcMemHandle GpuIpcMemHandle::createMulticast([[maybe_unused]] size_t bufferSize, + [[maybe_unused]] int numDevices) { +#if (CUDA_NVLS_API_AVAILABLE) + if (bufferSize == 0) { + throw Error("Multicasting buffer size should be positive", ErrorCode::InvalidUsage); + } + if (numDevices < 1) { + throw Error("Multicasting number of devices should be positive", ErrorCode::InvalidUsage); + } + auto handle = UniqueGpuIpcMemHandle(new GpuIpcMemHandle(), &GpuIpcMemHandle::deleter); + + bool isFabricAvailable = isFabricMemHandleAvailable(); + + // get granularity + size_t recMcGran; + CUmulticastObjectProp prop; + prop.size = bufferSize; + prop.numDevices = numDevices; + if (isFabricAvailable) { + prop.handleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR | CU_MEM_HANDLE_TYPE_FABRIC; + } else { + prop.handleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR; + } + MSCCLPP_CUTHROW(cuMulticastGetGranularity(&recMcGran, &prop, CU_MULTICAST_GRANULARITY_RECOMMENDED)); + + CUmemGenericAllocationHandle allocHandle; + size_t baseSize = ((bufferSize + recMcGran - 1) / recMcGran) * recMcGran; + prop.size = baseSize; + MSCCLPP_CUTHROW(cuMulticastCreate(&allocHandle, &prop)); + + handle->baseSize = baseSize; + handle->offsetFromBase = 0; + handle->typeFlags = GpuIpcMemHandle::Type::None; + + // POSIX FD handle + if (cuMemExportToShareableHandle(&(handle->posixFd.fd), allocHandle, CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0) == + CUDA_SUCCESS) { + handle->posixFd.pid = getpid(); + handle->typeFlags |= GpuIpcMemHandle::Type::PosixFd; + } + + // FABRIC handle + if (isFabricAvailable && (cuMemExportToShareableHandle(&(handle->fabric.handle), allocHandle, + CU_MEM_HANDLE_TYPE_FABRIC, 0) == CUDA_SUCCESS)) { + handle->typeFlags |= GpuIpcMemHandle::Type::Fabric; + } + + if (handle->typeFlags == GpuIpcMemHandle::Type::None) { + throw Error("createMulticast failed: neither POSIX FD nor FABRIC handle was created", ErrorCode::SystemError); + } + return handle; +#else // !(CUDA_NVLS_API_AVAILABLE) + throw Error("NVLS is not supported on this device (requires CUDA version >= 12.3 and Linux kernel version >= 5.6.0)", + ErrorCode::InvalidUsage); +#endif // !(CUDA_NVLS_API_AVAILABLE) +} + +GpuIpcMem::GpuIpcMem(const GpuIpcMemHandle& handle) + : handle_(handle), + isMulticast_(false), + multicastBindedAddr_(0), + type_(GpuIpcMemHandle::Type::None), + basePtr_(nullptr), + baseSize_(0), + dataPtr_(nullptr), + dataSize_(0) { + if (handle_.typeFlags == GpuIpcMemHandle::Type::None) { + throw Error("GpuIpcMemHandle type is None, cannot create GpuIpcMem", ErrorCode::InvalidUsage); + } + if ((type_ == GpuIpcMemHandle::Type::None) && (handle_.typeFlags & GpuIpcMemHandle::Type::Fabric)) { + if (cuMemImportFromShareableHandle(&allocHandle_, (void*)handle_.fabric.handle, CU_MEM_HANDLE_TYPE_FABRIC) == + CUDA_SUCCESS) { + type_ = GpuIpcMemHandle::Type::Fabric; + } + } + if ((type_ == GpuIpcMemHandle::Type::None) && (handle_.typeFlags & GpuIpcMemHandle::Type::PosixFd)) { + int dupfd = dupFdFromPid(handle_.posixFd.pid, handle_.posixFd.fd); + if (dupfd != -1) { + if (cuMemImportFromShareableHandle(&allocHandle_, (void*)(uintptr_t)dupfd, + CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR) == CUDA_SUCCESS) { + type_ = GpuIpcMemHandle::Type::PosixFd; + } + close(dupfd); + } + } + if ((type_ == GpuIpcMemHandle::Type::None) && (handle_.typeFlags & GpuIpcMemHandle::Type::RuntimeIpc)) { + cudaError_t err = cudaIpcOpenMemHandle(&basePtr_, handle_.runtimeIpc.handle, cudaIpcMemLazyEnablePeerAccess); + if (err == cudaSuccess) { + baseSize_ = handle_.baseSize; + dataPtr_ = static_cast(static_cast(basePtr_) + handle_.offsetFromBase); + dataSize_ = handle_.baseSize - handle_.offsetFromBase; + type_ = GpuIpcMemHandle::Type::RuntimeIpc; + return; + } else { + (void)cudaGetLastError(); + } + } + if (type_ == GpuIpcMemHandle::Type::None) { + std::stringstream ss; + ss << "Failed to open GpuIpcMemHandle (type: " << std::to_string(handle_.typeFlags) << ")"; + throw Error(ss.str(), ErrorCode::Aborted); + } +} + +GpuIpcMem::~GpuIpcMem() { + if (type_ == GpuIpcMemHandle::Type::RuntimeIpc) { + cudaError_t err = cudaIpcCloseMemHandle(basePtr_); + if (err != cudaSuccess) { + WARN("Failed to close CUDA IPC handle at pointer %p: %s", basePtr_, cudaGetErrorString(err)); + (void)cudaGetLastError(); + } + } else if (type_ == GpuIpcMemHandle::Type::PosixFd || type_ == GpuIpcMemHandle::Type::Fabric) { + CUresult res; + const char* errStr; + if (basePtr_) { + res = cuMemUnmap((CUdeviceptr)basePtr_, baseSize_); + if (res != CUDA_SUCCESS) { + (void)cuGetErrorString(res, &errStr); + WARN("Failed to unmap CUDA memory at pointer %p: %s", basePtr_, errStr); + } + res = cuMemAddressFree((CUdeviceptr)basePtr_, baseSize_); + if (res != CUDA_SUCCESS) { + (void)cuGetErrorString(res, &errStr); + WARN("Failed to free CUDA memory at pointer %p: %s", basePtr_, errStr); + } + } +#if (CUDA_NVLS_API_AVAILABLE) + if (isMulticast_ && multicastBindedAddr_) { + int deviceId; + res = cuPointerGetAttribute(&deviceId, CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL, multicastBindedAddr_); + if (res != CUDA_SUCCESS) { + (void)cuGetErrorString(res, &errStr); + WARN("Failed to get device ordinal for pointer %p: %s", (void*)multicastBindedAddr_, errStr); + deviceId = -1; + } else if (deviceId < 0) { + WARN("Invalid device ordinal %d for pointer %p", deviceId, (void*)multicastBindedAddr_); + } + CUdevice device; + if (cuDeviceGet(&device, deviceId) == CUDA_SUCCESS) { + (void)cuMulticastUnbind(allocHandle_, device, 0, baseSize_); + } + } +#endif // (CUDA_NVLS_API_AVAILABLE) + res = cuMemRelease(allocHandle_); + if (res != CUDA_SUCCESS) { + (void)cuGetErrorString(res, &errStr); + WARN("Failed to release CUDA memory allocation handle: %s", errStr); + } + } +} + +void* GpuIpcMem::map() { + if (type_ == GpuIpcMemHandle::Type::None) { + throw Error("GpuIpcMemHandle type is None, cannot map memory", ErrorCode::InvalidUsage); + } else if (dataPtr_ != nullptr) { + // Already mapped + return dataPtr_; + } + + size_t pageSize = getpagesize(); + if (handle_.baseSize % pageSize) { + std::stringstream ss; + ss << "Tried to map remote GPU memory with size " << handle_.baseSize + << " that is not a multiple of the local host page size " << pageSize; + throw Error(ss.str(), ErrorCode::InvalidUsage); + } + + int deviceId; + MSCCLPP_CUDATHROW(cudaGetDevice(&deviceId)); + + CUdeviceptr base; + size_t minGran; + CUmemAllocationProp prop = {}; + prop.type = CU_MEM_ALLOCATION_TYPE_PINNED; + prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE; + prop.location.id = deviceId; + MSCCLPP_CUTHROW(cuMemGetAllocationGranularity(&minGran, &prop, CU_MEM_ALLOC_GRANULARITY_MINIMUM)); + MSCCLPP_CUTHROW(cuMemAddressReserve(&base, handle_.baseSize, minGran, 0, 0)); + MSCCLPP_CUTHROW(cuMemMap(base, handle_.baseSize, 0, allocHandle_, 0)); + + CUmemAccessDesc accessDesc = {}; + accessDesc.location.type = CU_MEM_LOCATION_TYPE_DEVICE; + accessDesc.location.id = deviceId; + accessDesc.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE; + MSCCLPP_CUTHROW(cuMemSetAccess(base, handle_.baseSize, &accessDesc, 1)); + + basePtr_ = (void*)base; + baseSize_ = handle_.baseSize; + dataPtr_ = static_cast(static_cast(basePtr_) + handle_.offsetFromBase); + dataSize_ = handle_.baseSize - handle_.offsetFromBase; + return dataPtr_; +} + +void* GpuIpcMem::mapMulticast([[maybe_unused]] int numDevices, [[maybe_unused]] const CUdeviceptr bufferAddr, + [[maybe_unused]] size_t bufferSize) { +#if (CUDA_NVLS_API_AVAILABLE) + if (type_ != GpuIpcMemHandle::Type::PosixFd && type_ != GpuIpcMemHandle::Type::Fabric) { + throw Error("GpuIpcMemHandle type is not PosixFd or Fabric, cannot map multicast memory", ErrorCode::InvalidUsage); + } + int deviceId; + MSCCLPP_CUDATHROW(cudaGetDevice(&deviceId)); + MSCCLPP_CUTHROW(cuMulticastAddDevice(allocHandle_, deviceId)); + + size_t minMcGran; + CUmulticastObjectProp prop; + prop.size = handle_.baseSize; + prop.numDevices = numDevices; + prop.handleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR | CU_MEM_HANDLE_TYPE_FABRIC; + + MSCCLPP_CUTHROW(cuMulticastGetGranularity(&minMcGran, &prop, CU_MULTICAST_GRANULARITY_MINIMUM)); + + CUdeviceptr bufferPtr; + if (bufferAddr != 0) { + if (!isCuMemMapAllocated((void*)bufferAddr)) { + throw Error("This NVLS connection tried to bind a buffer that was not allocated with cuMemMap", + ErrorCode::InvalidUsage); + } + if ((uintptr_t)bufferAddr % minMcGran != 0) { + throw Error("This NVLS connection tried to bind a buffer that is not aligned to the minimum granularity", + ErrorCode::InvalidUsage); + } + if (bufferSize == 0) { + throw Error("NVLS buffer size should be larger than zero.", ErrorCode::InvalidUsage); + } + if (bufferSize % minMcGran != 0) { + std::stringstream ss; + ss << "Tried to bind a multicast buffer that is not aligned to the minimum granularity " << minMcGran + << ", buffer size: " << bufferSize; + throw Error(ss.str(), ErrorCode::InvalidUsage); + } + bufferPtr = bufferAddr; + } else { + multicastBuffer_ = GpuBuffer(handle_.baseSize).memory(); + bufferPtr = (CUdeviceptr)(multicastBuffer_.get()); + bufferSize = handle_.baseSize; + } + + // will block until all devices call cuMulticastAddDevice() + MSCCLPP_CUTHROW(cuMulticastBindAddr(allocHandle_, 0, bufferPtr, bufferSize, 0)); + multicastBindedAddr_ = bufferPtr; + + CUdeviceptr mcPtr; + MSCCLPP_CUTHROW(cuMemAddressReserve(&mcPtr, bufferSize, minMcGran, 0U, 0)); + MSCCLPP_CUTHROW(cuMemMap(mcPtr, bufferSize, 0, allocHandle_, 0)); + + CUmemAccessDesc accessDesc = {}; + accessDesc.location.type = CU_MEM_LOCATION_TYPE_DEVICE; + accessDesc.location.id = deviceId; + accessDesc.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE; + MSCCLPP_CUTHROW(cuMemSetAccess(mcPtr, bufferSize, &accessDesc, 1)); + + basePtr_ = (void*)mcPtr; + baseSize_ = handle_.baseSize; + dataPtr_ = basePtr_; + dataSize_ = bufferSize; + isMulticast_ = true; + return dataPtr_; +#else // !(CUDA_NVLS_API_AVAILABLE) + throw Error("NVLS is not supported on this device (requires CUDA version >= 12.3 and Linux kernel version >= 5.6.0)", + ErrorCode::InvalidUsage); +#endif // !(CUDA_NVLS_API_AVAILABLE) +} + +void* GpuIpcMem::data() const { + if (!dataPtr_) { + throw Error("GpuIpcMem data pointer is null. Call map() first.", ErrorCode::InvalidUsage); + } + return dataPtr_; +} + +} // namespace mscclpp diff --git a/src/gpu_utils.cc b/src/gpu_utils.cc index b186415c6..a0cb95311 100644 --- a/src/gpu_utils.cc +++ b/src/gpu_utils.cc @@ -68,8 +68,6 @@ std::shared_ptr gpuStreamPool() { namespace detail { -CUmemAllocationHandleType nvlsCompatibleMemHandleType = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR; - /// set memory access permission to read-write /// @param base Base memory pointer. /// @param size Size of the memory. @@ -165,7 +163,6 @@ void* gpuCallocPhysical(size_t bytes, size_t gran, size_t align) { } else { MSCCLPP_CUTHROW(result); } - nvlsCompatibleMemHandleType = (CUmemAllocationHandleType)requestedHandleTypes; if (align == 0) { align = getMulticastGranularity(nbytes, CU_MULTICAST_GRANULARITY_MINIMUM); diff --git a/src/include/communicator.hpp b/src/include/communicator.hpp index d2735618d..53d3e8dac 100644 --- a/src/include/communicator.hpp +++ b/src/include/communicator.hpp @@ -36,7 +36,7 @@ class RecvItem : public BaseRecvItem { class LocalRecvMemory { public: - LocalRecvMemory() : future_(std::move(promise_.get_future())) {} + LocalRecvMemory() : future_(promise_.get_future()) {} void set(RegisteredMemory memory) { promise_.set_value(std::move(memory)); } diff --git a/src/include/connection.hpp b/src/include/connection.hpp index ae6ea8d0f..b9df1a736 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -15,9 +15,15 @@ namespace mscclpp { +class SwitchChannel; + class CudaIpcConnection : public Connection { private: std::shared_ptr stream_; + std::shared_ptr nvlsMem_; + size_t nvlsNumDevs_; + + friend class SwitchChannel; public: CudaIpcConnection(std::shared_ptr context, const Endpoint& localEndpoint, const Endpoint& remoteEndpoint); diff --git a/src/include/endpoint.hpp b/src/include/endpoint.hpp index 8dfbb6903..fadd9ee5b 100644 --- a/src/include/endpoint.hpp +++ b/src/include/endpoint.hpp @@ -7,6 +7,7 @@ #include #include +#include "gpu_ipc_mem.hpp" #include "ib.hpp" #include "socket.h" @@ -15,14 +16,12 @@ namespace mscclpp { struct Endpoint::Impl { - Impl(EndpointConfig config, Context::Impl& contextImpl); + Impl(const EndpointConfig& config, Context::Impl& contextImpl); Impl(const std::vector& serialization); - Transport transport_; - Device device_; + EndpointConfig config_; uint64_t hostHash_; uint64_t pidHash_; - int maxWriteQueueSize_; // The following are only used for IB and are undefined for other transports. bool ibLocal_; @@ -34,6 +33,9 @@ struct Endpoint::Impl { SocketAddress socketAddress_; volatile uint32_t* abortFlag_; char netIfName_[MAX_IF_NAME_SIZE + 1]; + + // Only for NVLS transport. + UniqueGpuIpcMemHandle nvlsHandle_; }; } // namespace mscclpp diff --git a/src/include/execution_common.hpp b/src/include/execution_common.hpp index 0b462811e..1d277a158 100644 --- a/src/include/execution_common.hpp +++ b/src/include/execution_common.hpp @@ -5,8 +5,8 @@ #define MSCCLPP_EXECUTION_COMMON_HPP_ #include -#include #include +#include namespace mscclpp { diff --git a/src/include/gpu_ipc_mem.hpp b/src/include/gpu_ipc_mem.hpp new file mode 100644 index 000000000..e589ea8d4 --- /dev/null +++ b/src/include/gpu_ipc_mem.hpp @@ -0,0 +1,97 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef MSCCLPP_GPU_IPC_MEM_HPP_ +#define MSCCLPP_GPU_IPC_MEM_HPP_ + +#include +#include +#include + +namespace mscclpp { + +struct GpuIpcMemHandle { + struct Type { + static constexpr uint8_t None = 0; + static constexpr uint8_t RuntimeIpc = 1; + static constexpr uint8_t PosixFd = 2; + static constexpr uint8_t Fabric = 4; + }; + + using TypeFlags = uint8_t; + + TypeFlags typeFlags; + size_t baseSize; + size_t offsetFromBase; + + struct { + cudaIpcMemHandle_t handle; + } runtimeIpc; + + struct { + int pid; + int fd; + } posixFd; + + struct { + char handle[64]; + } fabric; + + static void deleter(GpuIpcMemHandle *handle); + + // We make GpuIpcMemHandle trivially copyable for easy serialization, + // and thus it cannot have explicit destructors. + // We use a custom deleter for unique_ptr to handle cleanup without a destructor. + struct UniquePtr : public std::unique_ptr { + using Base = std::unique_ptr; + + // Default constructor + UniquePtr() : Base(nullptr, &GpuIpcMemHandle::deleter) {} + + // Inherit other constructors + using Base::Base; + + // Allow implicit conversion from Base + UniquePtr(Base &&other) : Base(std::move(other)) {} + }; + + static UniquePtr create(const CUdeviceptr ptr); + static UniquePtr createMulticast(size_t bufferSize, int numDevices); +}; + +using UniqueGpuIpcMemHandle = GpuIpcMemHandle::UniquePtr; + +static_assert(std::is_trivially_copyable_v); + +class GpuIpcMem { + public: + GpuIpcMem(const GpuIpcMemHandle &handle); + + ~GpuIpcMem(); + + void *map(); + + void *mapMulticast(int numDevices, const CUdeviceptr bufferAddr = 0, size_t bufferSize = 0); + + void *multicastBuffer() const { return isMulticast_ ? multicastBuffer_.get() : nullptr; } + + void *data() const; + + size_t size() const { return dataSize_; } + + private: + GpuIpcMemHandle handle_; + CUmemGenericAllocationHandle allocHandle_; + std::shared_ptr multicastBuffer_; + bool isMulticast_; + [[maybe_unused]] CUdeviceptr multicastBindedAddr_; + uint8_t type_; + void *basePtr_; + size_t baseSize_; + void *dataPtr_; + size_t dataSize_; +}; + +} // namespace mscclpp + +#endif // MSCCLPP_GPU_IPC_MEM_HPP_ diff --git a/src/include/registered_memory.hpp b/src/include/registered_memory.hpp index 9e63af00c..13c3f3357 100644 --- a/src/include/registered_memory.hpp +++ b/src/include/registered_memory.hpp @@ -9,6 +9,7 @@ #include #include "communicator.hpp" +#include "gpu_ipc_mem.hpp" #include "ib.hpp" namespace mscclpp { @@ -19,25 +20,11 @@ struct TransportInfo { // TODO: rewrite this using std::variant or something bool ibLocal; union { - struct { - cudaIpcMemHandle_t cudaIpcBaseHandle; - size_t cudaIpcOffsetFromBase; - }; + GpuIpcMemHandle gpuIpcMemHandle; struct { const IbMr* ibMr; IbMrInfo ibMrInfo; }; - struct { - union { - char shareableHandle[64]; - struct { - // These are only defined for multicast (NVLS) capability - pid_t rootPid; - int fileDesc; - }; - }; - size_t offsetFromBase; - }; }; }; @@ -48,26 +35,22 @@ struct RegisteredMemory::Impl { // This is the original data pointer the RegisteredMemory was created with. void* originalDataPtr; size_t size; - // This is the size returned by cuMemGetAddressRange of data - size_t baseDataSize; uint64_t hostHash; uint64_t pidHash; - bool isCuMemMapAlloc; TransportFlags transports; std::vector transportInfos; + UniqueGpuIpcMemHandle localGpuIpcHandle; + std::unique_ptr remoteGpuIpcMem; + // Only used for IB transport std::unordered_map> ibMrMap; - // For sharing memory handle via file descriptor - int fileDesc = -1; - Impl(void* data, size_t size, TransportFlags transports, Context::Impl& contextImpl); Impl(const std::vector::const_iterator& begin, const std::vector::const_iterator& end); /// Constructs a RegisteredMemory::Impl from a vector of data. The constructor should only be used for the remote /// memory. Impl(const std::vector& data); - ~Impl(); const TransportInfo& getTransportInfo(Transport transport) const; }; diff --git a/src/nvls.cc b/src/nvls.cc deleted file mode 100644 index ab7548641..000000000 --- a/src/nvls.cc +++ /dev/null @@ -1,326 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -#include -#include - -#include -#include -#include -#include - -#include "api.h" -#include "debug.h" -#include "endpoint.hpp" - -namespace mscclpp { - -#if (CUDA_NVLS_API_AVAILABLE) -class NvlsConnection::Impl : public std::enable_shared_from_this { - public: - // use this only for the root of the NVLS - Impl(size_t bufferSize, int numDevices); - Impl(const std::vector& data); - ~Impl(); - - Impl(const Impl&) = delete; - Impl& operator=(const Impl&) = delete; - - size_t getMinMcGran() { return minMcGran_; } - std::vector serialize(); - void addDevice(int cudaDeviceId); - size_t allocateBuffer(size_t size); - void freeBuffer(size_t offset, size_t size) noexcept; - std::shared_ptr bindMemory(CUdeviceptr devicePtr, size_t devBuffSize); - - private: - friend class NvlsConnection; - - CUmemGenericAllocationHandle mcHandle_; - CUmulticastObjectProp mcProp_; - size_t bufferSize_; - size_t minMcGran_; - size_t mcGran_; - // These are only defined for multicast (NVLS) capability - pid_t rootPid_; - int mcFileDesc_; - - std::list> allocatedRanges_; - std::list> freeRanges_; -}; - -NvlsConnection::Impl::Impl(size_t bufferSize, int numDevices) { - minMcGran_ = 0; - mcGran_ = 0; - mcProp_ = {}; - mcProp_.size = bufferSize; - mcProp_.numDevices = numDevices; - mcProp_.handleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR; - MSCCLPP_CUTHROW(cuMulticastGetGranularity(&minMcGran_, &mcProp_, CU_MULTICAST_GRANULARITY_MINIMUM)); - MSCCLPP_CUTHROW(cuMulticastGetGranularity(&mcGran_, &mcProp_, CU_MULTICAST_GRANULARITY_RECOMMENDED)); - mcProp_.size = ((mcProp_.size + mcGran_ - 1) / mcGran_) * mcGran_; - bufferSize_ = mcProp_.size; - MSCCLPP_CUTHROW(cuMulticastCreate(&mcHandle_, &mcProp_)); - mcFileDesc_ = 0; - MSCCLPP_CUTHROW( - cuMemExportToShareableHandle(&mcFileDesc_, mcHandle_, CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0 /*flags*/)); - freeRanges_.emplace_back(0, bufferSize_); - - rootPid_ = getpid(); - if (rootPid_ < 0) { - throw mscclpp::SysError("getpid() failed", errno); - } - - INFO(MSCCLPP_COLL, - "NVLS handle created on root with size %ld. minGranularity %ld and recommendedGranularity %ld buffer size is " - "%ld, adjusted size is %ld", - mcProp_.size, minMcGran_, mcGran_, bufferSize, bufferSize_); -} - -NvlsConnection::Impl::Impl(const std::vector& data) { - auto it = data.begin(); - std::copy_n(it, sizeof(this->mcHandle_), reinterpret_cast(&this->mcHandle_)); - it += sizeof(this->mcHandle_); - std::copy_n(it, sizeof(this->bufferSize_), reinterpret_cast(&this->bufferSize_)); - it += sizeof(this->bufferSize_); - std::copy_n(it, sizeof(this->minMcGran_), reinterpret_cast(&this->minMcGran_)); - it += sizeof(this->minMcGran_); - std::copy_n(it, sizeof(this->mcGran_), reinterpret_cast(&this->mcGran_)); - it += sizeof(this->mcGran_); - std::copy_n(it, sizeof(this->rootPid_), reinterpret_cast(&this->rootPid_)); - it += sizeof(this->rootPid_); - std::copy_n(it, sizeof(this->mcFileDesc_), reinterpret_cast(&this->mcFileDesc_)); - - freeRanges_.emplace_back(0, bufferSize_); - int rootPidFd = syscall(SYS_pidfd_open, rootPid_, 0); - if (rootPidFd < 0) { - throw mscclpp::SysError("pidfd_open() failed", errno); - } - int mcRootFileDescFd = syscall(SYS_pidfd_getfd, rootPidFd, mcFileDesc_, 0); - if (mcRootFileDescFd < 0) { - throw mscclpp::SysError("pidfd_getfd() failed", errno); - } - MSCCLPP_CUTHROW(cuMemImportFromShareableHandle(&mcHandle_, reinterpret_cast(mcRootFileDescFd), - CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR)); - close(rootPidFd); - close(mcRootFileDescFd); - - INFO(MSCCLPP_COLL, "NVLS handle was imported from root"); -} - -NvlsConnection::Impl::~Impl() { - // we don't need to free multicast handle object according to NCCL. - if (rootPid_ == getpid()) { - close(mcFileDesc_); - } -} - -std::vector NvlsConnection::Impl::serialize() { - std::vector result; - std::copy_n(reinterpret_cast(&mcHandle_), sizeof(mcHandle_), std::back_inserter(result)); - std::copy_n(reinterpret_cast(&bufferSize_), sizeof(bufferSize_), std::back_inserter(result)); - std::copy_n(reinterpret_cast(&minMcGran_), sizeof(minMcGran_), std::back_inserter(result)); - std::copy_n(reinterpret_cast(&mcGran_), sizeof(mcGran_), std::back_inserter(result)); - std::copy_n(reinterpret_cast(&rootPid_), sizeof(rootPid_), std::back_inserter(result)); - std::copy_n(reinterpret_cast(&mcFileDesc_), sizeof(mcFileDesc_), std::back_inserter(result)); - return result; -} - -void NvlsConnection::Impl::addDevice(int cudaDeviceId) { - MSCCLPP_CUTHROW(cuMulticastAddDevice(mcHandle_, cudaDeviceId)); - INFO(MSCCLPP_COLL, "NVLS connection created"); -} - -// TODO(binyli): For cuMemMap, we can not map handle to va with offset not equal to 0. -// Then we don't need to maintain the freeRanges_ list. For different memory, we could map to different mc handle. -size_t NvlsConnection::Impl::allocateBuffer(size_t size) { - if (freeRanges_.empty()) { - throw Error("This NVLS connection mapped more than it was supposed to", ErrorCode::InvalidUsage); - } - auto it = std::find_if(freeRanges_.begin(), freeRanges_.end(), - [size](const std::pair& range) { return range.second >= size; }); - if (it != freeRanges_.end()) { - size_t offset = it->first; - size_t rangeSize = it->second; - if (rangeSize == size) { - freeRanges_.erase(it); - } else { - it->first += size; - it->second -= size; - } - allocatedRanges_.emplace_back(offset, size); - INFO(MSCCLPP_COLL, "NVLS connection allocated %ld bytes at offset %ld", size, offset); - return offset; - } - throw Error("This NVLS connection cannot map the requested devBuffSize", ErrorCode::InvalidUsage); -} - -void NvlsConnection::Impl::freeBuffer(size_t offset, size_t size) noexcept { - auto it = std::find_if( - allocatedRanges_.begin(), allocatedRanges_.end(), - [offset, size](const std::pair& range) { return range.first == offset && range.second == size; }); - if (it == allocatedRanges_.end()) { - WARN("NVLS connection tried to free a buffer that was not allocated"); - return; - } - allocatedRanges_.erase(it); - it = std::find_if(freeRanges_.begin(), freeRanges_.end(), [offset, size](const std::pair& range) { - return range.first + range.second >= offset; - }); - if (it == freeRanges_.end()) { - freeRanges_.emplace_back(offset, size); - return; - } - if (it->first + it->second == offset) { - // merge with the previous free range if possible - it->second += size; - // merge with the next free range if possible - auto nextItr = std::next(it); - if (nextItr != freeRanges_.end() && it->first + it->second == nextItr->first) { - it->second += nextItr->second; - freeRanges_.erase(nextItr); - } - return; - } else if (it->first == offset + size) { - // merge with the next free range if possible - it->first -= size; - it->second += size; - return; - } else { - freeRanges_.emplace(it, offset, size); - return; - } -} - -std::shared_ptr NvlsConnection::Impl::bindMemory(CUdeviceptr devicePtr, size_t devBuffSize) { - if (!isCuMemMapAllocated((void*)devicePtr)) { - throw Error("This NVLS connection tried to bind a buffer that was not allocated with cuMemMap", - ErrorCode::InvalidUsage); - } - - if ((uintptr_t)devicePtr % minMcGran_ != 0) { - WARN("NVLS connection tried to bind a buffer that is not aligned to the minimum granularity"); - throw Error("This NVLS connection tried to bind a buffer that is not aligned to the minimum granularity", - ErrorCode::InvalidUsage); - } - devBuffSize = ((devBuffSize + minMcGran_ - 1) / minMcGran_) * minMcGran_; - size_t offset = allocateBuffer(devBuffSize); - MSCCLPP_CUTHROW(cuMulticastBindAddr(mcHandle_, offset /*mcOffset*/, devicePtr, devBuffSize, 0)); - - char* mcPtr; - MSCCLPP_CUTHROW(cuMemAddressReserve((CUdeviceptr*)(&mcPtr), devBuffSize, minMcGran_, 0U, 0)); - MSCCLPP_CUTHROW(cuMemMap((CUdeviceptr)(mcPtr), devBuffSize, 0, mcHandle_, 0)); - detail::setReadWriteMemoryAccess(mcPtr, devBuffSize); - INFO(MSCCLPP_COLL, "NVLS connection bound memory at offset %ld, size %ld", offset, devBuffSize); - - auto deleter = [=, self = shared_from_this()](char* ptr) { - int deviceId; - CUdevice device; - MSCCLPP_CUDATHROW(cudaGetDevice(&deviceId)); - MSCCLPP_CUTHROW(cuDeviceGet(&device, deviceId)); - MSCCLPP_CUTHROW(cuMemUnmap((CUdeviceptr)ptr, devBuffSize)); - MSCCLPP_CUTHROW(cuMemAddressFree((CUdeviceptr)ptr, devBuffSize)); - // Refer to NCCL, Unbind can trigger RM error if buffer is freed already by users. - // Ignore error here, unbind will succeed anyway. - cuMulticastUnbind(mcHandle_, device, offset, devBuffSize); - self->freeBuffer(offset, devBuffSize); - }; - - return std::shared_ptr(mcPtr, deleter); -} - -#else // !(CUDA_NVLS_API_AVAILABLE) -class NvlsConnection::Impl { - public: - // use this only for the root of the NVLS - Impl(size_t, int) { throw notSupportedError; } - Impl(const std::vector&) { throw notSupportedError; } - - Impl(const Impl&) = delete; - Impl& operator=(const Impl&) = delete; - - std::vector serialize() { throw notSupportedError; } - size_t allocateBuffer(size_t) { throw notSupportedError; } - void freeBuffer(size_t, size_t) { throw notSupportedError; } - std::shared_ptr bindMemory(CUdeviceptr, size_t) { throw notSupportedError; } - void addDevice(int) { throw notSupportedError; } - size_t getMinMcGran() { throw notSupportedError; } - - private: - Error notSupportedError = - Error("NVLS is not supported on this CUDA version (< 12.3) or kernel version (< 5.6.0)", ErrorCode::InvalidUsage); -}; -#endif // !(CUDA_NVLS_API_AVAILABLE) - -NvlsConnection::NvlsConnection(size_t bufferSize, int numDevices) - : pimpl_(std::make_shared(bufferSize, numDevices)) {} - -void NvlsConnection::addDevice() { - int cudaDeviceId; - MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDeviceId)); - this->addDevice(cudaDeviceId); -} - -void NvlsConnection::addDevice(int cudaDeviceId) { pimpl_->addDevice(cudaDeviceId); } - -NvlsConnection::NvlsConnection(const std::vector& data) : pimpl_(std::make_shared(data)) {} - -std::vector NvlsConnection::serialize() { return pimpl_->serialize(); } - -SwitchChannel NvlsConnection::bindAllocatedMemory(CUdeviceptr devicePtr, size_t size) { - auto mcPtr = pimpl_->bindMemory(devicePtr, size); - return SwitchChannel((void*)devicePtr, mcPtr, size); -} - -SwitchChannel::DeviceHandle SwitchChannel::deviceHandle() const { - SwitchChannel::DeviceHandle device; - device.devicePtr = this->devicePtr_; - device.mcPtr = this->mcPtr_.get(); - device.bufferSize = this->bufferSize_; - return device; -}; - -void* SwitchChannel::getDevicePtr() { return devicePtr_; }; - -size_t NvlsConnection::getMultiCastMinGranularity() { return pimpl_->getMinMcGran(); } - -MSCCLPP_API_CPP std::shared_ptr connectNvlsCollective(std::shared_ptr comm, - std::vector allRanks, size_t bufferSize) { - auto bootstrap = comm->bootstrap(); - int rank = bootstrap->getRank(); - bool isRoot = false; - bool amongAllRanks = false; - int rootRank = allRanks[0]; - for (auto nvlsRank : allRanks) { - if (nvlsRank == rank) amongAllRanks = true; - rootRank = std::min(rootRank, nvlsRank); - } - if (amongAllRanks == false) { - throw Error("rank is not among allRanks", ErrorCode::InvalidUsage); - } - if (rootRank == rank) isRoot = true; - - std::shared_ptr conn; - if (isRoot) { - conn = std::make_shared(bufferSize, allRanks.size()); - auto serialized = conn->serialize(); - for (auto nvlsRank : allRanks) { - if (nvlsRank != rank) bootstrap->send(serialized, nvlsRank, 0); - } - } else { - std::vector data; - bootstrap->recv(data, rootRank, 0); - conn = std::make_shared(data); - } - - // Now let's synchronize all ranks - bootstrap->groupBarrier(allRanks); - // now it is safe to add my device - conn->addDevice(); - - // sync here to make sure all ranks have added their devices - bootstrap->groupBarrier(allRanks); - return conn; -} - -} // namespace mscclpp diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 85702ea14..9cfc3fbf9 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -27,21 +27,6 @@ } \ } while (false) -namespace { -CUmemAllocationHandleType getNvlsMemHandleType() { -#if (CUDA_NVLS_API_AVAILABLE) - if (mscclpp::detail::nvlsCompatibleMemHandleType & CU_MEM_HANDLE_TYPE_FABRIC) { - return CU_MEM_HANDLE_TYPE_FABRIC; - } else { - return CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR; - } -#else - throw mscclpp::Error("Only support GPU with NVLS support", mscclpp::ErrorCode::InvalidUsage); -#endif -} - -} // namespace - namespace mscclpp { RegisteredMemory::Impl::Impl(void* data, size_t size, TransportFlags transports, Context::Impl& contextImpl) @@ -52,37 +37,11 @@ RegisteredMemory::Impl::Impl(void* data, size_t size, TransportFlags transports, pidHash(getPidHash()), transports(transports) { if (transports.has(Transport::CudaIpc)) { + localGpuIpcHandle = GpuIpcMemHandle::create(reinterpret_cast(data)); TransportInfo transportInfo; transportInfo.transport = Transport::CudaIpc; - - void* baseDataPtr; - size_t baseDataSize; - MSCCLPP_CUTHROW(cuMemGetAddressRange((CUdeviceptr*)&baseDataPtr, &baseDataSize, (CUdeviceptr)data)); - this->baseDataSize = baseDataSize; - this->isCuMemMapAlloc = isCuMemMapAllocated(baseDataPtr); - if (this->isCuMemMapAlloc) { - CUmemGenericAllocationHandle handle; - MSCCLPP_CUTHROW(cuMemRetainAllocationHandle(&handle, baseDataPtr)); - if (getNvlsMemHandleType() == CU_MEM_HANDLE_TYPE_FABRIC) { - MSCCLPP_CUTHROW(cuMemExportToShareableHandle(transportInfo.shareableHandle, handle, getNvlsMemHandleType(), 0)); - } else { - transportInfo.rootPid = getpid(); - if (transportInfo.rootPid < 0) { - throw SysError("getpid() failed", errno); - } - MSCCLPP_CUTHROW(cuMemExportToShareableHandle(&transportInfo.fileDesc, handle, getNvlsMemHandleType(), 0)); - this->fileDesc = transportInfo.fileDesc; - } - transportInfo.offsetFromBase = (char*)data - (char*)baseDataPtr; - MSCCLPP_CUTHROW(cuMemRelease(handle)); - } else { - cudaIpcMemHandle_t handle; - MSCCLPP_CUDATHROW(cudaIpcGetMemHandle(&handle, baseDataPtr)); - // TODO: bug with offset of base? - transportInfo.cudaIpcBaseHandle = handle; - transportInfo.cudaIpcOffsetFromBase = (char*)data - (char*)baseDataPtr; - } - this->transportInfos.push_back(transportInfo); + transportInfo.gpuIpcMemHandle = *localGpuIpcHandle; + this->transportInfos.emplace_back(transportInfo); } if ((transports & AllIBTransports).any()) { auto addIb = [&](Transport ibTransport) { @@ -122,10 +81,8 @@ MSCCLPP_API_CPP std::vector RegisteredMemory::serialize() const { std::vector result; detail::serialize(result, pimpl_->originalDataPtr); detail::serialize(result, pimpl_->size); - detail::serialize(result, pimpl_->baseDataSize); detail::serialize(result, pimpl_->hostHash); detail::serialize(result, pimpl_->pidHash); - detail::serialize(result, pimpl_->isCuMemMapAlloc); detail::serialize(result, pimpl_->transports); if (pimpl_->transportInfos.size() > static_cast(std::numeric_limits::max())) { throw Error("Too many transport info entries", ErrorCode::InternalError); @@ -135,18 +92,7 @@ MSCCLPP_API_CPP std::vector RegisteredMemory::serialize() const { for (auto& entry : pimpl_->transportInfos) { detail::serialize(result, entry.transport); if (entry.transport == Transport::CudaIpc) { - if (pimpl_->isCuMemMapAlloc) { - if (getNvlsMemHandleType() == CU_MEM_HANDLE_TYPE_FABRIC) { - detail::serialize(result, entry.shareableHandle); - } else { - detail::serialize(result, entry.rootPid); - detail::serialize(result, entry.fileDesc); - } - detail::serialize(result, entry.offsetFromBase); - } else { - detail::serialize(result, entry.cudaIpcBaseHandle); - detail::serialize(result, entry.cudaIpcOffsetFromBase); - } + detail::serialize(result, entry.gpuIpcMemHandle); } else if (AllIBTransports.has(entry.transport)) { detail::serialize(result, entry.ibMrInfo); } else { @@ -165,10 +111,8 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, auto it = begin; it = detail::deserialize(it, this->originalDataPtr); it = detail::deserialize(it, this->size); - it = detail::deserialize(it, this->baseDataSize); it = detail::deserialize(it, this->hostHash); it = detail::deserialize(it, this->pidHash); - it = detail::deserialize(it, this->isCuMemMapAlloc); it = detail::deserialize(it, this->transports); int8_t transportCount; it = detail::deserialize(it, transportCount); @@ -176,18 +120,7 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, TransportInfo transportInfo; it = detail::deserialize(it, transportInfo.transport); if (transportInfo.transport == Transport::CudaIpc) { - if (this->isCuMemMapAlloc) { - if (getNvlsMemHandleType() == CU_MEM_HANDLE_TYPE_FABRIC) { - it = detail::deserialize(it, transportInfo.shareableHandle); - } else { - it = detail::deserialize(it, transportInfo.rootPid); - it = detail::deserialize(it, transportInfo.fileDesc); - } - it = detail::deserialize(it, transportInfo.offsetFromBase); - } else { - it = detail::deserialize(it, transportInfo.cudaIpcBaseHandle); - it = detail::deserialize(it, transportInfo.cudaIpcOffsetFromBase); - } + it = detail::deserialize(it, transportInfo.gpuIpcMemHandle); } else if (AllIBTransports.has(transportInfo.transport)) { it = detail::deserialize(it, transportInfo.ibMrInfo); transportInfo.ibLocal = false; @@ -206,99 +139,16 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, // The memory is local to the process, so originalDataPtr is valid as is this->data = this->originalDataPtr; } else if (transports.has(Transport::CudaIpc)) { - // The memory is local to the machine but not to the process, so we need to open the CUDA IPC handle auto entry = getTransportInfo(Transport::CudaIpc); - void* base; - if (this->isCuMemMapAlloc) { -#if (CUDA_NVLS_API_AVAILABLE) - CUmemGenericAllocationHandle handle; - if (getHostHash() != this->hostHash) { - // TODO: only open handle if in same MNNVL domain - CUresult err = cuMemImportFromShareableHandle(&handle, entry.shareableHandle, getNvlsMemHandleType()); - if (err != CUDA_SUCCESS) { - INFO(MSCCLPP_P2P, "Failed to import shareable handle from host: 0x%lx, may not be in the same MNNVL domain", - hostHash); - return; - } - } else { - if (getNvlsMemHandleType() == CU_MEM_HANDLE_TYPE_FABRIC) { - MSCCLPP_CUTHROW(cuMemImportFromShareableHandle(&handle, entry.shareableHandle, getNvlsMemHandleType())); - } else { - int rootPidFd = syscall(SYS_pidfd_open, entry.rootPid, 0); - if (rootPidFd < 0) { - throw SysError("pidfd_open() failed", errno); - } - int fd = syscall(SYS_pidfd_getfd, rootPidFd, entry.fileDesc, 0); - if (fd < 0) { - throw SysError("pidfd_getfd() failed", errno); - } - INFO(MSCCLPP_P2P, "Get file descriptor %d from pidfd %d on peer 0x%lx", fd, rootPidFd, hostHash); - MSCCLPP_CUTHROW(cuMemImportFromShareableHandle(&handle, reinterpret_cast(fd), - CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR)); - close(rootPidFd); - close(fd); - } - } - size_t minGran = detail::getMulticastGranularity(this->baseDataSize, CU_MULTICAST_GRANULARITY_MINIMUM); - size_t recommendedGran = - detail::getMulticastGranularity(this->baseDataSize, CU_MULTICAST_GRANULARITY_RECOMMENDED); - size_t size = (this->baseDataSize + recommendedGran - 1) / recommendedGran * recommendedGran; - MSCCLPP_CUTHROW(cuMemAddressReserve((CUdeviceptr*)&base, size, minGran, 0, 0)); - MSCCLPP_CUTHROW(cuMemMap((CUdeviceptr)base, size, 0, handle, 0)); - detail::setReadWriteMemoryAccess(base, size); - this->data = static_cast(base) + entry.offsetFromBase; -#else - throw Error("CUDA does not support NVLS. Please ensure your CUDA version supports NVLS to use this feature.", - ErrorCode::InvalidUsage); -#endif - } else if (getHostHash() == this->hostHash) { - MSCCLPP_CUDATHROW(cudaIpcOpenMemHandle(&base, entry.cudaIpcBaseHandle, cudaIpcMemLazyEnablePeerAccess)); - this->data = static_cast(base) + entry.cudaIpcOffsetFromBase; - } - } - if (this->data != nullptr) { - INFO(MSCCLPP_P2P, "Opened CUDA IPC handle at pointer %p", this->data); + this->remoteGpuIpcMem = std::make_unique(entry.gpuIpcMemHandle); + this->data = this->remoteGpuIpcMem->map(); + INFO(MSCCLPP_P2P, "Opened GpuIpcMemHandle at pointer %p", this->data); } } RegisteredMemory::Impl::Impl(const std::vector& serialization) : Impl(serialization.begin(), serialization.end()) {} -RegisteredMemory::Impl::~Impl() { - // Close the CUDA IPC handle if it was opened during deserialization or initialization - if (data && transports.has(Transport::CudaIpc) && getHostHash() == this->hostHash) { - if (getPidHash() == this->pidHash) { - // For local registered memory - if (fileDesc >= 0) { - close(fileDesc); - fileDesc = -1; - } - return; - } - // For remote registered memory - void* base = static_cast(data) - getTransportInfo(Transport::CudaIpc).cudaIpcOffsetFromBase; - if (this->isCuMemMapAlloc) { - CUmemGenericAllocationHandle handle; - size_t size = 0; - MSCCLPP_CULOG_WARN(cuMemRetainAllocationHandle(&handle, base)); - MSCCLPP_CULOG_WARN(cuMemRelease(handle)); - MSCCLPP_CULOG_WARN(cuMemGetAddressRange(NULL, &size, (CUdeviceptr)base)); - MSCCLPP_CULOG_WARN(cuMemUnmap((CUdeviceptr)base, size)); - MSCCLPP_CULOG_WARN(cuMemRelease(handle)); - MSCCLPP_CULOG_WARN(cuMemAddressFree((CUdeviceptr)base, size)); - } else { - cudaError_t err = cudaIpcCloseMemHandle(base); - if (err != cudaSuccess) { - WARN("Failed to close CUDA IPC handle at pointer %p: %s", base, cudaGetErrorString(err)); - } else { - INFO(MSCCLPP_P2P, "Closed CUDA IPC handle at pointer %p", base); - } - } - data = nullptr; - fileDesc = -1; - } -} - const TransportInfo& RegisteredMemory::Impl::getTransportInfo(Transport transport) const { for (auto& entry : transportInfos) { if (entry.transport == transport) { diff --git a/src/switch_channel.cc b/src/switch_channel.cc new file mode 100644 index 000000000..96c6cdf90 --- /dev/null +++ b/src/switch_channel.cc @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include +#include + +#include +#include +#include +#include + +#include "api.h" +#include "connection.hpp" +#include "debug.h" +#include "endpoint.hpp" +#include "gpu_ipc_mem.hpp" +#include "serialization.hpp" + +namespace mscclpp { + +SwitchChannel::SwitchChannel(std::shared_ptr nvlsConn, void* data, size_t bytes) { + if (!nvlsConn) { + std::abort(); + } + CudaIpcConnection* connection = dynamic_cast(nvlsConn->connection.get()); + if (!connection) { + throw std::runtime_error("Invalid connection type"); + } + if (!(connection->nvlsMem_)) { + std::abort(); + } + (void)connection->nvlsMem_->mapMulticast(connection->nvlsNumDevs_, CUdeviceptr(data), bytes); + devicePtr_ = data ? data : connection->nvlsMem_->multicastBuffer(); + if (!devicePtr_) { + std::abort(); + } + mcPtr_ = connection->nvlsMem_->data(); + bufferSize_ = connection->nvlsMem_->size(); +} + +SwitchChannel::DeviceHandle SwitchChannel::deviceHandle() const { + SwitchChannel::DeviceHandle device; + device.devicePtr = devicePtr_; + device.mcPtr = mcPtr_; + device.bufferSize = bufferSize_; + return device; +}; + +void* SwitchChannel::getDevicePtr() { return devicePtr_; }; + +MSCCLPP_API_CPP std::shared_ptr connectNvlsCollective(std::shared_ptr comm, + std::vector allRanks, size_t bufferSize) { + auto nvlsConnection = std::make_shared(); + + mscclpp::EndpointConfig cfg; + cfg.transport = mscclpp::Transport::CudaIpc; + cfg.device = mscclpp::DeviceType::GPU; + cfg.nvls.numDevices = allRanks.size(); + cfg.nvls.bufferSize = bufferSize; + if (comm->bootstrap()->getRank() == allRanks[0]) { + cfg.nvls.isRoot = true; + auto rootEndpoint = comm->context()->createEndpoint(cfg); + for (int peer = 1; peer < static_cast(allRanks.size()); ++peer) { + nvlsConnection->rootPeerConnections.push_back(comm->connect(rootEndpoint, peer).get()); + } + cfg.nvls.isRoot = false; + auto endpoint = comm->context()->createEndpoint(cfg); + nvlsConnection->rootSelfConnection = comm->context()->connect(rootEndpoint, endpoint); + nvlsConnection->connection = comm->context()->connect(endpoint, rootEndpoint); + } else { + cfg.nvls.isRoot = false; + auto endpoint = comm->context()->createEndpoint(cfg); + nvlsConnection->connection = comm->connect(endpoint, 0).get(); + } + return nvlsConnection; +} + +} // namespace mscclpp diff --git a/test/mp_unit/switch_channel_tests.cu b/test/mp_unit/switch_channel_tests.cu index da4cc9c22..c825242d6 100644 --- a/test/mp_unit/switch_channel_tests.cu +++ b/test/mp_unit/switch_channel_tests.cu @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include #include "mp_unit_tests.hpp" @@ -39,12 +39,12 @@ TEST_F(SwitchChannelTest, SimpleAllReduce) { ranks.push_back(i); } - auto buffer = mscclpp::GpuBuffer(1024); + auto buffer = mscclpp::GpuBuffer(1024 * 1024); float data = gEnv->rank + 1.0f; MSCCLPP_CUDATHROW(cudaMemcpy(buffer.data(), &data, sizeof(data), cudaMemcpyHostToDevice)); - auto nvlsConnection = mscclpp::connectNvlsCollective(communicator, ranks, 1024); - auto switchChannel = nvlsConnection->bindAllocatedMemory(CUdeviceptr(buffer.data()), 1024); + auto nvlsConnection = mscclpp::connectNvlsCollective(communicator, ranks, buffer.bytes()); + mscclpp::SwitchChannel switchChannel(nvlsConnection, buffer.data(), buffer.bytes()); auto deviceHandle = switchChannel.deviceHandle(); MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gConstSwitchChan, &deviceHandle, sizeof(deviceHandle))); diff --git a/tools/install.sh b/tools/install.sh index c191ae4d5..122bb002c 100755 --- a/tools/install.sh +++ b/tools/install.sh @@ -45,8 +45,12 @@ if [ ! -d "$INSTALL_DIR" ]; then echo "Error: Install directory '$INSTALL_DIR' does not exist." exit 1 fi - -trap 'rm -rf "$TMP_BUILD_DIR"' EXIT +if [ "${DEBUG:-0}" -eq 1 ]; then + BUILD_TYPE="Debug" +else + BUILD_TYPE="Release" + trap 'rm -rf "$TMP_BUILD_DIR"' EXIT +fi pushd "$TMP_BUILD_DIR" || exit 1 @@ -61,7 +65,7 @@ else fi $CMAKE \ - -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_BUILD_TYPE="$BUILD_TYPE" \ -DCMAKE_INSTALL_PREFIX="$INSTALL_DIR" \ -DMSCCLPP_BUILD_PYTHON_BINDINGS=OFF \ -DMSCCLPP_BUILD_TESTS=OFF \