Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/nccl/src/allreduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
#include <mscclpp/gpu_data_types.hpp>
#include <mscclpp/memory_channel.hpp>
#include <mscclpp/memory_channel_device.hpp>
#include <mscclpp/nvls.hpp>
#include <mscclpp/packet_device.hpp>
#include <mscclpp/switch_channel.hpp>
#include <type_traits>

#if defined(ENABLE_NPKIT)
Expand Down
4 changes: 2 additions & 2 deletions apps/nccl/src/nccl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <mscclpp/executor.hpp>
#include <mscclpp/memory_channel.hpp>
#include <mscclpp/memory_channel_device.hpp>
#include <mscclpp/nvls.hpp>
#include <mscclpp/switch_channel.hpp>
#include <mscclpp/utils.hpp>
#include <queue>
#include <sstream>
Expand Down Expand Up @@ -319,7 +319,7 @@ static std::vector<mscclpp::SwitchChannel> setupNvlsChannels(

for (size_t idx = 0; idx < NUM_NVLS_CONNECTION; ++idx) {
std::shared_ptr<mscclpp::NvlsConnection> nvlsConnection = conns[idx];
mscclpp::SwitchChannel SwitchChannel = nvlsConnection->bindAllocatedMemory((CUdeviceptr)buffer, bufferSize);
mscclpp::SwitchChannel SwitchChannel(nvlsConnection, buffer, bufferSize);
channels.push_back(SwitchChannel);
}
return channels;
Expand Down
2 changes: 0 additions & 2 deletions docs/cpp_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,6 @@ Channel Device Interfaces
.. doxygenstruct:: mscclpp::BasePortChannelDeviceHandle
:members:

.. doxygenunion:: mscclpp::ChannelTrigger

.. doxygenunion:: mscclpp::LL16Packet

.. doxygenunion:: mscclpp::LL8Packet
Expand Down
118 changes: 83 additions & 35 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,41 +378,70 @@ struct Device {
int id;
};

/// Used to configure an endpoint.
/// Configuration for creating communication endpoints.
struct EndpointConfig {
static const int DefaultMaxCqSize = 1024;
static const int DefaultMaxCqPollNum = 1;
static const int DefaultMaxSendWr = 8192;
static const int DefaultMaxWrPerSend = 64;

/// InfiniBand-specific configuration options that control queue pair behavior and performance characteristics.
/// These settings are only used when the transport is an InfiniBand type (IB0-IB7); they are ignored for other
/// transports.
struct Ib {
static const int DefaultMaxCqSize = 1024;
static const int DefaultMaxCqPollNum = 1;
static const int DefaultMaxSendWr = 8192;
static const int DefaultMaxWrPerSend = 64;

/// Maximum size of the completion queue.
int maxCqSize;
/// Maximum number of completion queue polls per operation.
int maxCqPollNum;
/// Maximum number of outstanding send work requests.
int maxSendWr;
/// Maximum number of work requests per send operation.
int maxWrPerSend;

/// Constructor.
/// @param maxCqSize Maximum completion queue size.
/// @param maxCqPollNum Maximum completion queue poll count.
/// @param maxSendWr Maximum outstanding send work requests.
/// @param maxWrPerSend Maximum work requests per send operation.
Ib(int maxCqSize = DefaultMaxCqSize, int maxCqPollNum = DefaultMaxCqPollNum, int maxSendWr = DefaultMaxSendWr,
int maxWrPerSend = DefaultMaxWrPerSend)
: maxCqSize(maxCqSize), maxCqPollNum(maxCqPollNum), maxSendWr(maxSendWr), maxWrPerSend(maxWrPerSend) {}
};

/// NVLS-specific configuration options.
struct Nvls {
int numDevices;
size_t bufferSize;
bool isRoot;

/// Constructor.
/// @param numDevices Number of devices to enroll. Should be positive when using NVLS transport.
/// @param bufferSize NVLS buffer size. Should be positive when using NVLS transport.
/// @param isRoot Whether this is the root device in the NVLS group.
Nvls(int numDevices = -1, size_t bufferSize = 0, bool isRoot = false)
: numDevices(numDevices), bufferSize(bufferSize), isRoot(isRoot) {}
};

/// Communication transport type (e.g., CudaIpc, IB0-IB7, Ethernet).
Transport transport;
/// Target device for the endpoint (GPU or CPU with optional device ID).
Device device;
int ibMaxCqSize;
int ibMaxCqPollNum;
int ibMaxSendWr;
int ibMaxWrPerSend;
/// Maximum number of write requests that can be queued (-1 for default).
int maxWriteQueueSize;

/// Constructor that takes a transport and sets the other fields to their default values.
///
/// @param transport The transport to use.
/// @param device The device to use.
/// @param ibMaxCqSize The maximum completion queue size.
/// @param ibMaxCqPollNum The maximum completion queue poll number.
/// @param ibMaxSendWr The maximum send work requests.
/// @param ibMaxWrPerSend The maximum work requests per send.
/// @param maxWriteQueueSize The maximum write queue size.
EndpointConfig(Transport transport = Transport::Unknown, Device device = DeviceType::GPU,
int ibMaxCqSize = DefaultMaxCqSize, int ibMaxCqPollNum = DefaultMaxCqPollNum,
int ibMaxSendWr = DefaultMaxSendWr, int ibMaxWrPerSend = DefaultMaxWrPerSend,
int maxWriteQueueSize = -1)
: transport(transport),
device(device),
ibMaxCqSize(ibMaxCqSize),
ibMaxCqPollNum(ibMaxCqPollNum),
ibMaxSendWr(ibMaxSendWr),
ibMaxWrPerSend(ibMaxWrPerSend),
maxWriteQueueSize(maxWriteQueueSize) {}
/// InfiniBand-specific options (used only for Transport::IBx).
Ib ib;
/// NVLS-specific options (used only for Transport::Nvls).
Nvls nvls;

/// Constructs endpoint configuration with specified transport, device, and optional settings.
/// @param transport Communication transport to use.
/// @param device Target device for the endpoint.
/// @param maxWriteQueueSize Maximum write queue size (-1 for system default).
/// @param ib IB-specific configuration.
/// @param nvls NVLS-specific configuration.
EndpointConfig(Transport transport = Transport::Unknown, Device device = DeviceType::GPU, int maxWriteQueueSize = -1,
Ib ib = {}, Nvls nvls = {})
: transport(transport), device(device), maxWriteQueueSize(maxWriteQueueSize), ib(ib), nvls(nvls) {}
};

class Context;
Expand All @@ -426,6 +455,10 @@ class Endpoint {
/// Constructor.
Endpoint() = default;

/// Get the configuration used.
/// @return The configuration used.
const EndpointConfig& config() const;

/// Get the transport used.
/// @return The transport used.
Transport transport() const;
Expand Down Expand Up @@ -687,9 +720,9 @@ class Semaphore {
std::shared_ptr<Impl> pimpl_;
};

/// Deprecated.
template <typename T>
using NonblockingFuture [[deprecated("Use std::shared_future instead. This will be removed in a future release.")]] =
std::shared_future<T>;
using NonblockingFuture = std::shared_future<T>;

/// A class that sets up all registered memories and connections between processes.
///
Expand Down Expand Up @@ -855,12 +888,20 @@ class Communicator {
/// on the last future, it will start receiving the five RegisteredMemory or Connection objects in order,
/// back to back.
///
/// @param localConfig The configuration for the local endpoint.
/// @param localEndpoint The local endpoint.
/// @param remoteRank The rank of the remote process.
/// @param tag The tag to use for identifying the send and receive.
/// @return A future of shared pointer to the connection.
///
std::shared_future<std::shared_ptr<Connection>> connect(EndpointConfig localConfig, int remoteRank, int tag = 0);
std::shared_future<std::shared_ptr<Connection>> connect(const Endpoint& localEndpoint, int remoteRank, int tag = 0);

/// Connect to a remote rank. Wrapper of `connect(localEndpoint, remoteRank, tag)`.
/// @param localConfig The configuration for the local endpoint.
/// @param remoteRank The rank of the remote process.
/// @param tag The tag to use for identifying the send and receive.
/// @return A future of shared pointer to the connection.
std::shared_future<std::shared_ptr<Connection>> connect(const EndpointConfig& localConfig, int remoteRank,
int tag = 0);

[[deprecated("Use connect(localConfig, remoteRank, tag) instead. This will be removed in a future release.")]] std::
shared_future<std::shared_ptr<Connection>>
Expand All @@ -873,6 +914,13 @@ class Communicator {
}

/// Build a semaphore for cross-process synchronization.
/// @param localStub The SemaphoreStub to be the local end of the semaphore.
/// @param remoteRank The rank of the remote process.
/// @param tag The tag to use for identifying the operation.
/// @return A future of the built semaphore.
std::shared_future<Semaphore> buildSemaphore(const SemaphoreStub& localStub, int remoteRank, int tag = 0);

/// Build a semaphore for cross-process synchronization. Wrapper of `buildSemaphore(localStub, remoteRank, tag)`.
/// @param connection The connection associated with this semaphore.
/// @param remoteRank The rank of the remote process.
/// @param tag The tag to use for identifying the operation.
Expand Down
24 changes: 15 additions & 9 deletions include/mscclpp/gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
#ifndef MSCCLPP_GPU_HPP_
#define MSCCLPP_GPU_HPP_

#if defined(__HIP_PLATFORM_AMD__)
#include <mscclpp/device.hpp>

#include <hip/hip_runtime.h>
#if defined(MSCCLPP_DEVICE_HIP)

using cudaError_t = hipError_t;
using cudaEvent_t = hipEvent_t;
Expand Down Expand Up @@ -42,10 +42,14 @@ constexpr auto CU_MEM_ALLOCATION_TYPE_PINNED = hipMemAllocationTypePinned;
constexpr auto CU_MEM_LOCATION_TYPE_DEVICE = hipMemLocationTypeDevice;
constexpr auto CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR = hipMemHandleTypePosixFileDescriptor;
constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWrite;
constexpr auto CU_MEM_ALLOC_GRANULARITY_MINIMUM = hipMemAllocationGranularityMinimum;
constexpr auto CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL = HIP_POINTER_ATTRIBUTE_DEVICE_ORDINAL;

#ifndef CUDA_SUCCESS
#define CUDA_SUCCESS hipSuccess
#endif // CUDA_SUCCESS
#define CUDA_ERROR_NOT_SUPPORTED hipErrorNotSupported
#define CUDA_ERROR_INVALID_VALUE hipErrorInvalidValue

#define cudaEventCreate(...) hipEventCreate(__VA_ARGS__)
#define cudaEventCreateWithFlags(...) hipEventCreateWithFlags(__VA_ARGS__)
Expand Down Expand Up @@ -93,6 +97,7 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri
#define cuMemAddressReserve(...) hipMemAddressReserve(__VA_ARGS__)
#define cuMemAddressFree(...) hipMemAddressFree(__VA_ARGS__)
#define cuMemGetAddressRange(...) hipMemGetAddressRange(__VA_ARGS__)
#define cuMemGetAllocationGranularity(...) hipMemGetAllocationGranularity(__VA_ARGS__)
#define cuMemCreate(...) hipMemCreate(__VA_ARGS__)
#define cuMemRelease(...) hipMemRelease(__VA_ARGS__)
#define cuMemSetAccess(...) hipMemSetAccess(__VA_ARGS__)
Expand All @@ -101,30 +106,31 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri
#define cuMemRetainAllocationHandle(...) hipMemRetainAllocationHandle(__VA_ARGS__)
#define cuMemExportToShareableHandle(...) hipMemExportToShareableHandle(__VA_ARGS__)
#define cuMemImportFromShareableHandle(...) hipMemImportFromShareableHandle(__VA_ARGS__)
#define cuPointerGetAttribute(...) hipPointerGetAttribute(__VA_ARGS__)

#else
#else // !defined(MSCCLPP_DEVICE_HIP)

#include <cuda.h>
#include <cuda_runtime.h>

#endif
#endif // !defined(MSCCLPP_DEVICE_HIP)

// NVLS
#if !defined(__HIP_PLATFORM_AMD__)
#if !defined(MSCCLPP_DEVICE_HIP)
#include <linux/version.h>
#if CUDART_VERSION < 12030
#define CU_MEM_HANDLE_TYPE_FABRIC ((CUmemAllocationHandleType)0x8ULL)
#endif
// We need CUDA 12.3 above and kernel 5.6.0 above for NVLS API
#define CUDA_NVLS_API_AVAILABLE ((CUDART_VERSION >= 12030) && (LINUX_VERSION_CODE >= KERNEL_VERSION(5, 6, 0)))
#else // defined(__HIP_PLATFORM_AMD__)
#else // defined(MSCCLPP_DEVICE_HIP)
#define CUDA_NVLS_API_AVAILABLE 0
// NVLS is not supported on AMD platform, just to avoid compilation error
#define CU_MEM_HANDLE_TYPE_FABRIC (0x8ULL)
#endif // !defined(__HIP_PLATFORM_AMD__)
#define CU_MEM_HANDLE_TYPE_FABRIC ((hipMemAllocationHandleType)0x8ULL)
#endif // !defined(MSCCLPP_DEVICE_HIP)

// GPU sync threads
#if defined(__HIP_PLATFORM_AMD__)
#if defined(MSCCLPP_DEVICE_HIP)
#define __syncshm() asm volatile("s_waitcnt lgkmcnt(0) \n s_barrier");
#else
#define __syncshm() __syncthreads();
Expand Down
1 change: 0 additions & 1 deletion include/mscclpp/gpu_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ void* gpuCallocHost(size_t bytes, unsigned int flags);
void* gpuCallocUncached(size_t bytes);
#endif // defined(__HIP_PLATFORM_AMD__)
#if (CUDA_NVLS_API_AVAILABLE)
extern CUmemAllocationHandleType nvlsCompatibleMemHandleType;
void* gpuCallocPhysical(size_t bytes, size_t gran = 0, size_t align = 0);
#endif // CUDA_NVLS_API_AVAILABLE

Expand Down
7 changes: 3 additions & 4 deletions include/mscclpp/memory_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
#ifndef MSCCLPP_MEMORY_CHANNEL_HPP_
#define MSCCLPP_MEMORY_CHANNEL_HPP_

#include <mscclpp/core.hpp>
#include <mscclpp/memory_channel_device.hpp>
#include <mscclpp/semaphore.hpp>
#include <type_traits>

#include "core.hpp"
#include "memory_channel_device.hpp"
#include "semaphore.hpp"

namespace mscclpp {

/// Memory channel without specifying source/destination memory regions.
Expand Down
71 changes: 0 additions & 71 deletions include/mscclpp/nvls.hpp

This file was deleted.

Loading
Loading