diff --git a/.azure-pipelines/templates/integration-test.yaml b/.azure-pipelines/templates/integration-test.yaml index ea0668be2..1513493f9 100644 --- a/.azure-pipelines/templates/integration-test.yaml +++ b/.azure-pipelines/templates/integration-test.yaml @@ -224,7 +224,9 @@ steps: export PATH=/usr/local/mpi/bin:\$PATH; \ export LD_LIBRARY_PATH=/root/mscclpp/build:\$LD_LIBRARY_PATH; \ cd /root/mscclpp; \ - ./build/test/perf/fifo_test"' + ./build/test/perf/fifo_test"; \ + echo \"mpirun --allow-run-as-root -np 8 ./build/test/perf/fifo_test_multi_gpu\"; \ + mpirun --allow-run-as-root -np 8 ./build/test/perf/fifo_test_multi_gpu"' kill $CHILD_PID workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/test/perf/CMakeLists.txt b/test/perf/CMakeLists.txt index 56f9e8c2e..c0ff6baf1 100644 --- a/test/perf/CMakeLists.txt +++ b/test/perf/CMakeLists.txt @@ -40,3 +40,4 @@ endfunction() # Add FIFO test add_perf_test_executable(fifo_test "framework.cc;fifo_test.cu") +add_perf_test_executable(fifo_test_multi_gpu "framework.cc;fifo_test_multi_gpu.cu") diff --git a/test/perf/fifo_test_multi_gpu.cu b/test/perf/fifo_test_multi_gpu.cu new file mode 100644 index 000000000..c6c503045 --- /dev/null +++ b/test/perf/fifo_test_multi_gpu.cu @@ -0,0 +1,383 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "framework.hpp" + +using namespace mscclpp::test; + +// Constants for trigger calculation +constexpr int MIN_TRIGGERS = 1000; +constexpr int TRIGGERS_PER_FIFO_SIZE = 10; + +__constant__ mscclpp::PortChannelDeviceHandle gPortChannel; + +struct MultiGpuTestConfig { + int fifoSize; + int numGpus; // Total number of GPUs + int numGroups; // Number of groups + std::vector parallelismLevels; + + MultiGpuTestConfig(int size, int gpus, int groups, const std::vector& parallel = {64, 128, 256, 512}) + : fifoSize(size), numGpus(gpus), numGroups(groups), parallelismLevels(parallel) { + if (numGpus % numGroups != 0) { + throw std::invalid_argument("Number of GPUs must be divisible by number of groups"); + } + } + + int getGroupSize() const { return numGpus / numGroups; } + int getGroupIndex(int rank) const { return rank / getGroupSize(); } + int getLocalRankInGroup(int rank) const { return rank % getGroupSize(); } + + // Get all ranks that participate in cross-group signaling (local rank 0 from each group) + std::vector getCrossGroupSignalingRanks() const { + std::vector signalingRanks; + for (int group = 0; group < numGroups; group++) { + int localRank0 = group * getGroupSize(); // First rank in each group + signalingRanks.push_back(localRank0); + } + return signalingRanks; + } + + // Check if this rank should participate in cross-group signaling + bool shouldParticipateInSignaling(int rank) const { + return getLocalRankInGroup(rank) == 0; // Only local rank 0 in each group participates + } +}; + +// Enhanced kernels for multi-GPU signaling +__global__ void kernelMultiGpuSignalSend(mscclpp::PortChannelDeviceHandle* portHandles, int numPeers, int numParallel) { + int tid = threadIdx.x; + + // Each thread sends signals to all peers + if (tid < numParallel) { + for (int peer = 0; peer < numPeers; peer++) { + portHandles[peer].signal(); + } + } +} + +__global__ void kernelMultiGpuSignalWait(mscclpp::PortChannelDeviceHandle* portHandles, int numPeers, int numParallel) { + int tid = threadIdx.x; + + // Each thread waits for signals from all peers + if (tid < numParallel) { + for (int peer = 0; peer < numPeers; peer++) { + portHandles[peer].wait(); + } + } +} + +static void setupCuda(int& cudaDevice, int& numaNode) { + utils::CUDA_CHECK(cudaGetDevice(&cudaDevice)); + numaNode = mscclpp::getDeviceNumaNode(cudaDevice); + mscclpp::numaBind(numaNode); +} + +// Enhanced performance measurement function +std::tuple runMultiGpuKernelVariant( + cudaStream_t stream, int numParallel, int rank, + const std::vector& sendPortHandles, + const std::vector& recvPortHandles, const MultiGpuTestConfig& config) { + // Calculate triggers based on FIFO size, but respect the limit + const int maxParallel = std::min(numParallel, config.fifoSize); + const int numTriggers = std::max(MIN_TRIGGERS, static_cast(config.fifoSize * TRIGGERS_PER_FIFO_SIZE)); + + // Configure kernel launch parameters + int threadsPerBlock = std::min(maxParallel, 256); + int threadBlocks = (maxParallel + threadsPerBlock - 1) / threadsPerBlock; + + // Copy port handles to device memory using MSCCLPP gpuCallocShared + std::shared_ptr d_sendHandles = nullptr; + std::shared_ptr d_recvHandles = nullptr; + + if (!sendPortHandles.empty()) { + d_sendHandles = mscclpp::detail::gpuCallocShared(sendPortHandles.size()); + mscclpp::gpuMemcpy(d_sendHandles.get(), sendPortHandles.data(), sendPortHandles.size(), cudaMemcpyHostToDevice); + } + + if (!recvPortHandles.empty()) { + d_recvHandles = mscclpp::detail::gpuCallocShared(recvPortHandles.size()); + mscclpp::gpuMemcpy(d_recvHandles.get(), recvPortHandles.data(), recvPortHandles.size(), cudaMemcpyHostToDevice); + } + + // Benchmark + utils::Timer timer; + timer.start(); + + bool shouldSignal = config.shouldParticipateInSignaling(rank); + + if (shouldSignal) { + // Launch signaling kernels + if (!sendPortHandles.empty()) { + kernelMultiGpuSignalSend<<>>(d_sendHandles.get(), + sendPortHandles.size(), maxParallel); + utils::CUDA_CHECK(cudaGetLastError()); + } + + // Launch waiting kernels + if (!recvPortHandles.empty()) { + kernelMultiGpuSignalWait<<>>(d_recvHandles.get(), + recvPortHandles.size(), maxParallel); + utils::CUDA_CHECK(cudaGetLastError()); + } + } + + utils::CUDA_CHECK(cudaStreamSynchronize(stream)); + timer.stop(); + + const int totalSignals = numTriggers * maxParallel * (sendPortHandles.size() + recvPortHandles.size()); + double throughput = totalSignals / timer.elapsedSeconds(); + double duration_us = timer.elapsedMicroseconds(); + + utils::CUDA_CHECK(cudaDeviceSynchronize()); + + return {throughput, duration_us, totalSignals}; +} + +// Main multi-GPU test function +void runMultiGpuTest(const MultiGpuTestConfig& config, const mscclpp::test::TestContext& context) { + int rank = context.rank; + int worldSize = context.size; + auto communicator = context.communicator; + auto bootstrap = context.bootstrap; + + if (worldSize != config.numGpus) { + throw std::invalid_argument("World size must match number of GPUs in config"); + } + + // Set the device for this process + cudaSetDevice(rank); + + // Setup transport + mscclpp::TransportFlags transport = mscclpp::Transport::CudaIpc; + std::vector ibTransports{ + mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2, mscclpp::Transport::IB3, + mscclpp::Transport::IB4, mscclpp::Transport::IB5, mscclpp::Transport::IB6, mscclpp::Transport::IB7}; + std::vector> connections; + + // Only create connections for GPUs that need to communicate + if (config.shouldParticipateInSignaling(rank)) { + mscclpp::Transport selectedTransport = ibTransports[rank % ibTransports.size()]; + transport |= selectedTransport; + + // Get all ranks that participate in cross-group signaling + auto signalingRanks = config.getCrossGroupSignalingRanks(); + + for (int peerRank : signalingRanks) { + if (peerRank != rank) { + connections.push_back(communicator->connect(selectedTransport, peerRank).get()); + } + } + } + + // Wait for all connections to be established + bootstrap->barrier(); + + // Create and start proxy service + auto proxyService = std::make_shared(config.fifoSize); + proxyService->startProxy(); + + // Setup semaphore flags + uint64_t* localSemaphoreFlag; + cudaMalloc(&localSemaphoreFlag, sizeof(uint64_t)); + cudaMemset(localSemaphoreFlag, 0, sizeof(uint64_t)); + auto localFlagRegmem = communicator->registerMemory(localSemaphoreFlag, sizeof(uint64_t), transport); + + int cudaDevice, numaNode; + setupCuda(cudaDevice, numaNode); + + cudaStream_t stream; + utils::CUDA_CHECK(cudaStreamCreate(&stream)); + + // Setup port channels for communication + std::vector sendPortHandles; + std::vector recvPortHandles; + + if (config.shouldParticipateInSignaling(rank)) { + // Get all ranks that participate in cross-group signaling + auto signalingRanks = config.getCrossGroupSignalingRanks(); + int connIndex = 0; + + for (int peerRank : signalingRanks) { + if (peerRank != rank && connIndex < connections.size()) { + auto connection = connections[connIndex++]; + auto semaphoreId = proxyService->buildAndAddSemaphore(*communicator, connection); + + // Create port channels for bidirectional communication + auto sendPortChannel = proxyService->portChannel(semaphoreId, proxyService->addMemory(localFlagRegmem), + proxyService->addMemory(localFlagRegmem)); + auto recvPortChannel = proxyService->portChannel(semaphoreId, proxyService->addMemory(localFlagRegmem), + proxyService->addMemory(localFlagRegmem)); + + sendPortHandles.push_back(sendPortChannel.deviceHandle()); + recvPortHandles.push_back(recvPortChannel.deviceHandle()); + } + } + } + + // Create test name + std::string testName = "MultiGpuTest_GPUs" + std::to_string(config.numGpus) + "_Groups" + + std::to_string(config.numGroups) + "_FifoSize" + std::to_string(config.fifoSize); + + // Print test configuration + if (utils::isMainRank()) { + std::cout << "Running Multi-GPU test: " << config.numGpus << " GPUs, " << config.numGroups + << " groups, FIFO size=" << config.fifoSize << std::endl; + + // Print which ranks participate in cross-group signaling + auto signalingRanks = config.getCrossGroupSignalingRanks(); + std::cout << "Cross-group signaling participants: "; + for (size_t i = 0; i < signalingRanks.size(); ++i) { + if (i > 0) std::cout << ", "; + std::cout << "rank " << signalingRanks[i] << " (group " << config.getGroupIndex(signalingRanks[i]) << ")"; + } + std::cout << std::endl; + } + + nlohmann::ordered_json combinedMetrics; + + // Run tests for different parallelism levels + for (int numParallel : config.parallelismLevels) { + // Ensure parallelism doesn't exceed FIFO size + int effectiveParallel = std::min(numParallel, config.fifoSize); + + // Add synchronization before each test iteration + MPI_Barrier(MPI_COMM_WORLD); + + if (config.shouldParticipateInSignaling(rank)) { + auto [throughput, duration, totalSignals] = + runMultiGpuKernelVariant(stream, effectiveParallel, rank, sendPortHandles, recvPortHandles, config); + + std::string prefix = "p" + std::to_string(effectiveParallel) + "_"; + combinedMetrics[prefix + "throughput_signals_per_sec"] = double(int(throughput * 10)) / 10.0; + combinedMetrics[prefix + "duration_us"] = duration; + combinedMetrics[prefix + "total_signals"] = totalSignals; + combinedMetrics[prefix + "participating_gpus"] = config.numGpus; + } + + // Add synchronization after each test iteration + MPI_Barrier(MPI_COMM_WORLD); + } + + // Record results + std::map testParams; + testParams["num_gpus"] = std::to_string(config.numGpus); + testParams["num_groups"] = std::to_string(config.numGroups); + testParams["group_size"] = std::to_string(config.getGroupSize()); + testParams["fifo_size"] = std::to_string(config.fifoSize); + testParams["participating_in_signaling"] = config.shouldParticipateInSignaling(rank) ? "true" : "false"; + + // Add information about cross-group signaling ranks + if (config.shouldParticipateInSignaling(rank)) { + auto signalingRanks = config.getCrossGroupSignalingRanks(); + std::stringstream ss; + for (size_t i = 0; i < signalingRanks.size(); ++i) { + if (i > 0) ss << ","; + ss << signalingRanks[i]; + } + testParams["cross_group_signaling_ranks"] = ss.str(); + } + + utils::recordResult(testName, "multi_gpu_signaling", combinedMetrics, testParams); + + // Cleanup + utils::CUDA_CHECK(cudaStreamDestroy(stream)); + cudaFree(localSemaphoreFlag); + proxyService->stopProxy(); +} + +void runAllMultiGpuTests(const mscclpp::test::TestContext& context) { + std::vector configs = { + // 8 GPUs, 2 groups (4 GPUs per group) - local rank 0 participates in signaling + MultiGpuTestConfig(512, 8, 2, {1, 8, 64, 128, 256, 512}), + + // 8 GPUs, 4 groups (2 GPUs per group) - local rank 0 participates in signaling + MultiGpuTestConfig(512, 8, 4, {1, 8, 64, 128, 256, 512}), + + // 8 GPUs, 8 groups (1 GPU per group) - local rank 0 participates in signaling + MultiGpuTestConfig(512, 8, 8, {1, 8, 64, 128, 256, 512}), + }; + + for (const auto& config : configs) { + // Only run if we have the right number of GPUs + if (context.size == config.numGpus) { + runMultiGpuTest(config, context); + } + } +} + +static void printUsage(char* argv0) { + std::stringstream ss; + ss << "Usage: " << argv0 << " [OPTIONS]\n" + << "\n" + << "Options:\n" + << " -o, --output-format FORMAT Output format: human or json (default: human)\n" + << " -f, --output-file FILE JSON output file path (default: report.jsonl)\n" + << " -v, --verbose Increase verbosity\n" + << " -h, --help Show this help message\n"; + std::cout << ss.str(); +} + +int main(int argc, char* argv[]) { + std::string outputFormat = "human"; + std::string outputFile = "report.jsonl"; + bool verbose = false; + + static struct option longOptions[] = {{"output-format", required_argument, 0, 'o'}, + {"output-file", required_argument, 0, 'f'}, + {"verbose", no_argument, 0, 'v'}, + {"help", no_argument, 0, 'h'}, + {0, 0, 0, 0}}; + + int c; + while ((c = getopt_long(argc, argv, "o:f:vh", longOptions, nullptr)) != -1) { + switch (c) { + case 'o': + outputFormat = optarg; + break; + case 'f': + outputFile = optarg; + break; + case 'v': + verbose = true; + break; + case 'h': + printUsage(argv[0]); + return 0; + default: + printUsage(argv[0]); + return 1; + } + } + + std::vector>> tests = { + {"AllMultiGpuTests", "Multi-GPU signaling tests with configurable groups", runAllMultiGpuTests}}; + + int result = utils::runMultipleTests(argc, argv, tests); + + if (utils::isMainRank()) { + if (outputFormat == "json") { + utils::writeResultsToFile(outputFile); + } else { + utils::printResults(verbose); + } + } + + utils::cleanupMPI(); + + return result; +} diff --git a/test/perf/framework.cc b/test/perf/framework.cc index 85f7abd81..6df157d35 100644 --- a/test/perf/framework.cc +++ b/test/perf/framework.cc @@ -145,9 +145,8 @@ void cudaCheck(cudaError_t err, const char* file, int line) { } } -int runMultipleTests( - int argc, char* argv[], - const std::vector>>& tests) { +int runMultipleTests(int argc, char* argv[], + const std::vector>& tests) { int totalResult = 0; // Initialize MPI once for all tests @@ -159,10 +158,47 @@ int runMultipleTests( int size = getMPISize(); int local_rank = rank; // For simplicity, assume local_rank = rank + // Check if any test needs TestContext + bool needsTestContext = false; + for (const auto& test : tests) { + const TestFunction& testFunction = std::get<2>(test); + if (std::holds_alternative>(testFunction)) { + needsTestContext = true; + break; + } + } + + // Only create communicator and bootstrap if needed + std::shared_ptr bootstrap; + std::shared_ptr comm; + TestContext context; + + if (needsTestContext) { + bootstrap = std::make_shared(rank, size); + mscclpp::UniqueId id; + if (isMainProcess()) { + id = mscclpp::TcpBootstrap::createUniqueId(); + } + MPI_Bcast((void*)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); + bootstrap->initialize(id); + + std::vector trans{mscclpp::Transport::IB0, mscclpp::Transport::IB1}; + cudaSetDevice(rank); + comm = std::make_shared(bootstrap); + bootstrap->barrier(); + + // Initialize test context + context.rank = rank; + context.size = size; + context.local_rank = local_rank; + context.communicator = comm; + context.bootstrap = bootstrap; + } + for (const auto& test : tests) { const std::string& testName = std::get<0>(test); const std::string& testDescription = std::get<1>(test); - const std::function& testFunction = std::get<2>(test); + const TestFunction& testFunction = std::get<2>(test); if (rank == 0) { std::cout << "Running test: " << testName << std::endl; @@ -171,12 +207,17 @@ int runMultipleTests( } } - // Don't clear results - accumulate them for all tests in the same file - // g_results.clear(); // Commented out to accumulate results - try { - // Run the individual test function with MPI information - testFunction(rank, size, local_rank); + // Run the appropriate test function based on its type + if (std::holds_alternative>(testFunction)) { + // Legacy API + const auto& legacyFunction = std::get>(testFunction); + legacyFunction(rank, size, local_rank); + } else { + // New API - pass TestContext + const auto& newFunction = std::get>(testFunction); + newFunction(context); + } // Synchronize before moving to next test MPI_Barrier(MPI_COMM_WORLD); @@ -189,20 +230,41 @@ int runMultipleTests( } } - // Don't cleanup MPI here - let the caller handle it - // finalizeMPI(); - } catch (const std::exception& e) { if (g_mpi_rank == 0) { std::cerr << "Error: " << e.what() << std::endl; } - finalizeMPI(); + return 1; } return totalResult; } +int runMultipleTests( + int argc, char* argv[], + const std::vector>>& tests) { + // Convert to unified format + std::vector> unifiedTests; + for (const auto& test : tests) { + unifiedTests.emplace_back(std::get<0>(test), std::get<1>(test), TestFunction(std::get<2>(test))); + } + + return runMultipleTests(argc, argv, unifiedTests); +} + +int runMultipleTests( + int argc, char* argv[], + const std::vector>>& tests) { + // Convert to unified format + std::vector> unifiedTests; + for (const auto& test : tests) { + unifiedTests.emplace_back(std::get<0>(test), std::get<1>(test), TestFunction(std::get<2>(test))); + } + + return runMultipleTests(argc, argv, unifiedTests); +} + } // namespace utils } // namespace test } // namespace mscclpp diff --git a/test/perf/framework.hpp b/test/perf/framework.hpp index e9b8c31f5..d66bfa503 100644 --- a/test/perf/framework.hpp +++ b/test/perf/framework.hpp @@ -10,15 +10,30 @@ #include #include #include +#include #include #include #include #include +#include #include namespace mscclpp { namespace test { +// Forward declarations +class Communicator; +class Connection; + +// Test context structure containing MPI and MSCCLPP objects +struct TestContext { + int rank; + int size; + int local_rank; + std::shared_ptr communicator; + std::shared_ptr bootstrap; +}; + // Test result structure struct TestResult { std::string test_name; @@ -33,11 +48,24 @@ struct TestResult { // Simple utility functions for testing namespace utils { +// Test function variant type +using TestFunction = std::variant, // Legacy API + std::function // New API + >; + // Test execution utilities int runMultipleTests( int argc, char* argv[], const std::vector>>& tests); +int runMultipleTests( + int argc, char* argv[], + const std::vector>>& tests); + +// Unified test execution API +int runMultipleTests(int argc, char* argv[], + const std::vector>& tests); + // MPI management void initializeMPI(int argc, char* argv[]); void cleanupMPI();