From 6cd8960394e14965fc9abcd702564fd2bdbac970 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Thu, 15 Jun 2023 20:55:57 +0800 Subject: [PATCH] DirectChannel Unit Tests (#102) * Add DirectChannel unit tests * Split mp_unit_tests.cu into multiple files --- include/mscclpp/channel.hpp | 2 - test/CMakeLists.txt | 14 +- test/mp_unit/CMakeLists.txt | 8 + test/mp_unit/bootstrap_tests.cc | 136 ++++ test/mp_unit/communicator_tests.cu | 276 ++++++++ test/mp_unit/device_channel_tests.cu | 138 ++++ test/mp_unit/direct_channel_tests.cu | 227 +++++++ test/mp_unit/ib_tests.cu | 287 ++++++++ test/mp_unit/mp_unit_tests.cc | 83 +++ test/mp_unit/mp_unit_tests.hpp | 144 ++++ test/mp_unit_tests.cu | 976 --------------------------- 11 files changed, 1307 insertions(+), 984 deletions(-) create mode 100644 test/mp_unit/CMakeLists.txt create mode 100644 test/mp_unit/bootstrap_tests.cc create mode 100644 test/mp_unit/communicator_tests.cu create mode 100644 test/mp_unit/device_channel_tests.cu create mode 100644 test/mp_unit/direct_channel_tests.cu create mode 100644 test/mp_unit/ib_tests.cu create mode 100644 test/mp_unit/mp_unit_tests.cc create mode 100644 test/mp_unit/mp_unit_tests.hpp delete mode 100644 test/mp_unit_tests.cu diff --git a/include/mscclpp/channel.hpp b/include/mscclpp/channel.hpp index a625ac78..88f9e3ac 100644 --- a/include/mscclpp/channel.hpp +++ b/include/mscclpp/channel.hpp @@ -326,8 +326,6 @@ struct DirectChannel { for (size_t i = threadId; i < nElem; i += numThreads) { ChannelPacket* pkt = &tmpBase[i]; srcBase[i] = pkt->read(flag); - // for future reuse - pkt->clear(); } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 28dd43d9..404986d5 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -12,19 +12,21 @@ endfunction() add_test_executable(allgather_test_cpp allgather_test_cpp.cu) add_test_executable(allgather_test_host_offloading allgather_test_host_offloading.cu) -add_executable(mp_unit_tests mp_unit_tests.cu) -target_link_libraries(mp_unit_tests mscclpp CUDA::cudart CUDA::cuda_driver MPI::MPI_CXX GTest::gtest_main GTest::gmock_main) -target_include_directories(mp_unit_tests PRIVATE ${PROJECT_SOURCE_DIR}/src/include) -gtest_discover_tests(mp_unit_tests DISCOVERY_MODE PRE_TEST) - configure_file(run_mpi_test.sh.in run_mpi_test.sh) # Unit tests add_executable(unit_tests) target_link_libraries(unit_tests GTest::gtest_main GTest::gmock_main mscclpp CUDA::cudart CUDA::cuda_driver) target_include_directories(unit_tests PRIVATE ${PROJECT_SOURCE_DIR}/src/include) -add_subdirectory(unit) # This adds the sources to the mscclpp target +add_subdirectory(unit) gtest_discover_tests(unit_tests DISCOVERY_MODE PRE_TEST) +# Multi-process unit tests +add_executable(mp_unit_tests) +target_link_libraries(mp_unit_tests mscclpp CUDA::cudart CUDA::cuda_driver MPI::MPI_CXX GTest::gtest_main GTest::gmock_main) +target_include_directories(mp_unit_tests PRIVATE ${PROJECT_SOURCE_DIR}/src/include) +add_subdirectory(mp_unit) +gtest_discover_tests(mp_unit_tests DISCOVERY_MODE PRE_TEST) + # Msccclpp_test add_subdirectory(mscclpp-test) diff --git a/test/mp_unit/CMakeLists.txt b/test/mp_unit/CMakeLists.txt new file mode 100644 index 00000000..78080970 --- /dev/null +++ b/test/mp_unit/CMakeLists.txt @@ -0,0 +1,8 @@ +target_sources(mp_unit_tests PRIVATE + bootstrap_tests.cc + ib_tests.cu + communicator_tests.cu + device_channel_tests.cu + direct_channel_tests.cu + mp_unit_tests.cc +) diff --git a/test/mp_unit/bootstrap_tests.cc b/test/mp_unit/bootstrap_tests.cc new file mode 100644 index 00000000..caf5b641 --- /dev/null +++ b/test/mp_unit/bootstrap_tests.cc @@ -0,0 +1,136 @@ +#include + +#include "config.hpp" +#include "mp_unit_tests.hpp" + +void BootstrapTest::bootstrapTestAllGather(std::shared_ptr bootstrap) { + std::vector tmp(bootstrap->getNranks(), 0); + tmp[bootstrap->getRank()] = bootstrap->getRank() + 1; + bootstrap->allGather(tmp.data(), sizeof(int)); + for (int i = 0; i < bootstrap->getNranks(); ++i) { + EXPECT_EQ(tmp[i], i + 1); + } +} + +void BootstrapTest::bootstrapTestBarrier(std::shared_ptr bootstrap) { bootstrap->barrier(); } + +void BootstrapTest::bootstrapTestSendRecv(std::shared_ptr bootstrap) { + for (int i = 0; i < bootstrap->getNranks(); i++) { + if (bootstrap->getRank() == i) continue; + int msg1 = (bootstrap->getRank() + 1) * 3; + int msg2 = (bootstrap->getRank() + 1) * 3 + 1; + int msg3 = (bootstrap->getRank() + 1) * 3 + 2; + bootstrap->send(&msg1, sizeof(int), i, 0); + bootstrap->send(&msg2, sizeof(int), i, 1); + bootstrap->send(&msg3, sizeof(int), i, 2); + } + + for (int i = 0; i < bootstrap->getNranks(); i++) { + if (bootstrap->getRank() == i) continue; + int msg1 = 0; + int msg2 = 0; + int msg3 = 0; + // recv them in the opposite order to check correctness + bootstrap->recv(&msg2, sizeof(int), i, 1); + bootstrap->recv(&msg3, sizeof(int), i, 2); + bootstrap->recv(&msg1, sizeof(int), i, 0); + EXPECT_EQ(msg1, (i + 1) * 3); + EXPECT_EQ(msg2, (i + 1) * 3 + 1); + EXPECT_EQ(msg3, (i + 1) * 3 + 2); + } +} + +void BootstrapTest::bootstrapTestAll(std::shared_ptr bootstrap) { + bootstrapTestAllGather(bootstrap); + bootstrapTestBarrier(bootstrap); + bootstrapTestSendRecv(bootstrap); +} + +TEST_F(BootstrapTest, WithId) { + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + mscclpp::UniqueId id; + if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); + MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); + bootstrap->initialize(id); + bootstrapTestAll(bootstrap); +} + +TEST_F(BootstrapTest, WithIpPortPair) { + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + bootstrap->initialize(gEnv->args["ip_port"]); + bootstrapTestAll(bootstrap); +} + +TEST_F(BootstrapTest, ResumeWithId) { + for (int i = 0; i < 5; ++i) { + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + mscclpp::UniqueId id; + if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); + MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); + bootstrap->initialize(id); + } +} + +TEST_F(BootstrapTest, ResumeWithIpPortPair) { + for (int i = 0; i < 5; ++i) { + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + bootstrap->initialize(gEnv->args["ip_port"]); + } +} + +TEST_F(BootstrapTest, ExitBeforeConnect) { + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + mscclpp::UniqueId id = bootstrap->createUniqueId(); +} + +TEST_F(BootstrapTest, TimeoutWithId) { + // Set bootstrap timeout to 1 second + mscclpp::Config* cfg = mscclpp::Config::getInstance(); + cfg->setBootstrapConnectionTimeoutConfig(1); + + mscclpp::Timer timer; + + // All ranks initialize a bootstrap with their own id (will hang) + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + mscclpp::UniqueId id = bootstrap->createUniqueId(); + + try { + bootstrap->initialize(id); + } catch (const mscclpp::Error& e) { + ASSERT_EQ(e.getErrorCode(), mscclpp::ErrorCode::Timeout); + } + + // Timeout should be sligtly greater than 1 second + ASSERT_GT(timer.elapsed(), 1000000); + ASSERT_LT(timer.elapsed(), 1100000); +} + +class MPIBootstrap : public mscclpp::BaseBootstrap { + public: + MPIBootstrap() : BaseBootstrap() {} + int getRank() override { + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + return rank; + } + int getNranks() override { + int worldSize; + MPI_Comm_size(MPI_COMM_WORLD, &worldSize); + return worldSize; + } + void allGather(void* sendbuf, int size) override { + MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sendbuf, size, MPI_BYTE, MPI_COMM_WORLD); + } + void barrier() override { MPI_Barrier(MPI_COMM_WORLD); } + void send(void* sendbuf, int size, int dest, int tag) override { + MPI_Send(sendbuf, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD); + } + void recv(void* recvbuf, int size, int source, int tag) override { + MPI_Recv(recvbuf, size, MPI_BYTE, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } +}; + +TEST_F(BootstrapTest, MPIBootstrap) { + auto bootstrap = std::make_shared(); + bootstrapTestAll(bootstrap); +} diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu new file mode 100644 index 00000000..1a7f49ad --- /dev/null +++ b/test/mp_unit/communicator_tests.cu @@ -0,0 +1,276 @@ +#include + +#include +#include + +#include "mp_unit_tests.hpp" + +void CommunicatorTestBase::SetUp() { + MultiProcessTest::SetUp(); + + if (numRanksToUse == -1) { + numRanksToUse = gEnv->worldSize; + } + ASSERT_LE(numRanksToUse, gEnv->worldSize); + + std::shared_ptr bootstrap; + mscclpp::UniqueId id; + if (gEnv->rank < numRanksToUse) { + bootstrap = std::make_shared(gEnv->rank, numRanksToUse); + if (gEnv->rank == 0) id = bootstrap->createUniqueId(); + } + MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); + + if (gEnv->rank >= numRanksToUse) { + return; + } + bootstrap->initialize(id); + communicator = std::make_shared(bootstrap); + ibTransport = ibIdToTransport(rankToLocalRank(gEnv->rank)); +} + +void CommunicatorTestBase::TearDown() { + connections.clear(); + communicator.reset(); + MultiProcessTest::TearDown(); +} + +void CommunicatorTestBase::setNumRanksToUse(int num) { numRanksToUse = num; } + +int CommunicatorTestBase::rankToLocalRank(int rank) const { return rank % gEnv->nRanksPerNode; } + +int CommunicatorTestBase::rankToNode(int rank) const { return rank / gEnv->nRanksPerNode; } + +void CommunicatorTestBase::connectMesh(bool useIbOnly) { + for (int i = 0; i < numRanksToUse; i++) { + if (i != gEnv->rank) { + if ((rankToNode(i) == rankToNode(gEnv->rank)) && !useIbOnly) { + connections[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::CudaIpc); + } else { + connections[i] = communicator->connectOnSetup(i, 0, ibTransport); + } + } + } + communicator->setup(); +} + +// Register a local memory and receive corresponding remote memories +void CommunicatorTestBase::registerMemoryPairs(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, + const std::vector& remoteRanks, + mscclpp::RegisteredMemory& localMemory, + std::unordered_map& remoteMemories) { + localMemory = communicator->registerMemory(buff, buffSize, transport); + std::unordered_map> futureRemoteMemories; + for (int remoteRank : remoteRanks) { + if (remoteRank != communicator->bootstrapper()->getRank()) { + communicator->sendMemoryOnSetup(localMemory, remoteRank, tag); + futureRemoteMemories[remoteRank] = communicator->recvMemoryOnSetup(remoteRank, tag); + } + } + communicator->setup(); + for (int remoteRank : remoteRanks) { + if (remoteRank != communicator->bootstrapper()->getRank()) { + remoteMemories[remoteRank] = futureRemoteMemories[remoteRank].get(); + } + } +} + +// Register a local memory an receive one corresponding remote memory +void CommunicatorTestBase::registerMemoryPair(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, + int remoteRank, mscclpp::RegisteredMemory& localMemory, + mscclpp::RegisteredMemory& remoteMemory) { + std::vector remoteRanks = {remoteRank}; + std::unordered_map remoteMemories; + registerMemoryPairs(buff, buffSize, transport, tag, remoteRanks, localMemory, remoteMemories); + remoteMemory = remoteMemories[remoteRank]; +} + +void CommunicatorTest::SetUp() { + CommunicatorTestBase::SetUp(); + + ASSERT_EQ((deviceBufferSize / sizeof(int)) % gEnv->worldSize, 0); + + connectMesh(); + + devicePtr.resize(numBuffers); + localMemory.resize(numBuffers); + remoteMemory.resize(numBuffers); + + std::vector remoteRanks; + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank) { + remoteRanks.push_back(i); + } + } + + for (int n = 0; n < numBuffers; n++) { + devicePtr[n] = mscclpp::allocSharedCuda(deviceBufferSize / sizeof(int)); + registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, + localMemory[n], remoteMemory[n]); + } +} + +void CommunicatorTest::TearDown() { + remoteMemory.clear(); + localMemory.clear(); + devicePtr.clear(); + CommunicatorTestBase::TearDown(); +} + +void CommunicatorTest::deviceBufferInit() { + size_t dataCount = deviceBufferSize / sizeof(int); + for (int n = 0; n < (int)devicePtr.size(); n++) { + std::vector hostBuffer(dataCount, 0); + for (int i = 0; i < dataCount; i++) { + hostBuffer[i] = gEnv->rank + n * gEnv->worldSize; + } + mscclpp::memcpyCuda(devicePtr[n].get(), hostBuffer.data(), dataCount, cudaMemcpyHostToDevice); + } +} + +void CommunicatorTest::writeToRemote(int dataCountPerRank) { + for (int n = 0; n < numBuffers; n++) { + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank) { + auto& conn = connections.at(i); + auto& peerMemory = remoteMemory[n].at(i); + conn->write(peerMemory, gEnv->rank * dataCountPerRank * sizeof(int), localMemory[n], + gEnv->rank * dataCountPerRank * sizeof(int), dataCountPerRank * sizeof(int)); + conn->flush(); + } + } + } +} + +bool CommunicatorTest::testWriteCorrectness(bool skipLocal) { + size_t dataCount = deviceBufferSize / sizeof(int); + for (int n = 0; n < (int)devicePtr.size(); n++) { + std::vector hostBuffer(dataCount, 0); + mscclpp::memcpyCuda(hostBuffer.data(), devicePtr[n].get(), dataCount, cudaMemcpyDeviceToHost); + for (int i = 0; i < gEnv->worldSize; i++) { + if (((i / gEnv->nRanksPerNode) == (gEnv->rank / gEnv->nRanksPerNode)) && skipLocal) { + continue; + } + for (int j = i * dataCount / gEnv->worldSize; j < (i + 1) * dataCount / gEnv->worldSize; j++) { + if (hostBuffer[j] != i + n * gEnv->worldSize) { + return false; + } + } + } + } + return true; +} + +TEST_F(CommunicatorTest, BasicWrite) { + if (gEnv->rank >= numRanksToUse) return; + + deviceBufferInit(); + communicator->bootstrapper()->barrier(); + + writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); + communicator->bootstrapper()->barrier(); + + // polling until it becomes ready + bool ready = false; + int niter = 0; + do { + ready = testWriteCorrectness(); + niter++; + if (niter == 10000) { + FAIL() << "Polling is stuck."; + } + } while (!ready); + communicator->bootstrapper()->barrier(); +} + +__global__ void kernelWaitEpochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) { + int tid = threadIdx.x; + if (tid != rank && tid < worldSize) { + deviceEpochs[tid].wait(); + } +} + +TEST_F(CommunicatorTest, WriteWithDeviceEpochs) { + if (gEnv->rank >= numRanksToUse) return; + + std::unordered_map> epochs; + for (auto entry : connections) { + auto& conn = entry.second; + epochs.insert({entry.first, std::make_shared(*communicator.get(), conn)}); + } + communicator->setup(); + communicator->bootstrapper()->barrier(); + + deviceBufferInit(); + communicator->bootstrapper()->barrier(); + + auto deviceEpochHandles = mscclpp::allocSharedCuda(gEnv->worldSize); + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank) { + mscclpp::DeviceEpoch::DeviceHandle deviceHandle = epochs[i]->deviceHandle(); + mscclpp::memcpyCuda(deviceEpochHandles.get() + i, &deviceHandle, 1, + cudaMemcpyHostToDevice); + } + } + communicator->bootstrapper()->barrier(); + + writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); + + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank) { + epochs[i]->signal(); + } + } + + kernelWaitEpochs<<<1, gEnv->worldSize>>>(deviceEpochHandles.get(), gEnv->rank, gEnv->worldSize); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + ASSERT_TRUE(testWriteCorrectness()); + communicator->bootstrapper()->barrier(); +} + +TEST_F(CommunicatorTest, WriteWithHostEpochs) { + if (gEnv->rank >= numRanksToUse) return; + + std::unordered_map> epochs; + for (auto entry : connections) { + auto& conn = entry.second; + // HostEpoch cannot be used with CudaIpc transport + if (conn->transport() == mscclpp::Transport::CudaIpc) continue; + epochs.insert({entry.first, std::make_shared(*communicator.get(), conn)}); + } + communicator->setup(); + communicator->bootstrapper()->barrier(); + + deviceBufferInit(); + communicator->bootstrapper()->barrier(); + + writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); + + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { + epochs[i]->signal(); + } + } + + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { + epochs[i]->wait(); + } + } + + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { + epochs[i]->signal(); + } + } + + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { + epochs[i]->wait(); + } + } + + ASSERT_TRUE(testWriteCorrectness()); + communicator->bootstrapper()->barrier(); +} diff --git a/test/mp_unit/device_channel_tests.cu b/test/mp_unit/device_channel_tests.cu new file mode 100644 index 00000000..e1f5ea7c --- /dev/null +++ b/test/mp_unit/device_channel_tests.cu @@ -0,0 +1,138 @@ +#include "mp_unit_tests.hpp" + +void DeviceChannelOneToOneTest::SetUp() { + // Use only two ranks + setNumRanksToUse(2); + CommunicatorTestBase::SetUp(); + channelService = std::make_shared(*communicator.get()); +} + +void DeviceChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); } + +void DeviceChannelOneToOneTest::setupMeshConnections(std::vector& devChannels, + bool useIbOnly, void* sendBuff, size_t sendBuffBytes, + void* recvBuff, size_t recvBuffBytes) { + const int rank = communicator->bootstrapper()->getRank(); + const int worldSize = communicator->bootstrapper()->getNranks(); + const bool isInPlace = (recvBuff == nullptr); + mscclpp::TransportFlags transport = mscclpp::Transport::CudaIpc | ibTransport; + + connectMesh(useIbOnly); + + for (int r = 0; r < worldSize; r++) { + if (r == rank) { + continue; + } + mscclpp::RegisteredMemory sendMemory; + mscclpp::RegisteredMemory remoteMemory; + + if (isInPlace) { + registerMemoryPair(sendBuff, sendBuffBytes, transport, 0, r, sendMemory, remoteMemory); + } else { + sendMemory = communicator->registerMemory(recvBuff, recvBuffBytes, transport); + mscclpp::RegisteredMemory recvMemory; + registerMemoryPair(recvBuff, recvBuffBytes, transport, 0, r, recvMemory, remoteMemory); + } + + mscclpp::channel::ChannelId cid = channelService->addChannel(connections[r]); + communicator->setup(); + + devChannels.emplace_back(channelService->deviceChannel(cid), channelService->addMemory(remoteMemory), + channelService->addMemory(sendMemory)); + } +} + +__constant__ mscclpp::channel::SimpleDeviceChannel gChannelOneToOneTestConstDevChans; + +__global__ void kernelDevicePingPong(int* buff, int rank, int nElem, int* ret) { + mscclpp::channel::SimpleDeviceChannel& devChan = gChannelOneToOneTestConstDevChans; + volatile int* sendBuff = (volatile int*)buff; + int nTries = 1000; + int flusher = 0; + int rank1Offset = 10000000; + for (int i = 0; i < nTries; i++) { + if (rank == 0) { + if (i > 0) { + if (threadIdx.x == 0) devChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + if (sendBuff[j] != rank1Offset + i - 1 + j) { + // printf("rank 0 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], rank1Offset + i - 1 + j); + *ret = 1; + break; + } + } + } + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + sendBuff[j] = i + j; + } + __syncthreads(); + // __threadfence_system(); // not necessary if we make sendBuff volatile + if (threadIdx.x == 0) devChan.putWithSignal(0, nElem * sizeof(int)); + } + if (rank == 1) { + if (threadIdx.x == 0) devChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + if (sendBuff[j] != i + j) { + // printf("rank 1 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], i + j); + *ret = 1; + break; + } + } + if (i < nTries - 1) { + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + sendBuff[j] = rank1Offset + i + j; + } + __syncthreads(); + // __threadfence_system(); // not necessary if we make sendBuff volatile + if (threadIdx.x == 0) devChan.putWithSignal(0, nElem * sizeof(int)); + } + } + flusher++; + if (flusher == 100) { + devChan.flush(); + flusher = 0; + } + } +} + +TEST_F(DeviceChannelOneToOneTest, PingPongIb) { + if (gEnv->rank >= numRanksToUse) return; + + const int nElem = 4 * 1024 * 1024; + + std::vector devChannels; + std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + setupMeshConnections(devChannels, true, buff.get(), nElem * sizeof(int)); + + ASSERT_EQ(devChannels.size(), 1); + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstDevChans, devChannels.data(), + sizeof(mscclpp::channel::SimpleDeviceChannel))); + + channelService->startProxy(); + + std::shared_ptr ret = mscclpp::makeSharedCudaHost(0); + + kernelDevicePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelDevicePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelDevicePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelDevicePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + channelService->stopProxy(); +} diff --git a/test/mp_unit/direct_channel_tests.cu b/test/mp_unit/direct_channel_tests.cu new file mode 100644 index 00000000..c26543f0 --- /dev/null +++ b/test/mp_unit/direct_channel_tests.cu @@ -0,0 +1,227 @@ +#include "mp_unit_tests.hpp" + +void DirectChannelOneToOneTest::SetUp() { + // Use only two ranks + setNumRanksToUse(2); + CommunicatorTestBase::SetUp(); +} + +void DirectChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); } + +void DirectChannelOneToOneTest::setupMeshConnections(std::vector& dirChannels, + void* inputBuff, size_t inputBuffBytes, void* outputBuff, + size_t outputBuffBytes) { + const int rank = communicator->bootstrapper()->getRank(); + const int worldSize = communicator->bootstrapper()->getNranks(); + const bool isInPlace = (outputBuff == nullptr); + mscclpp::TransportFlags transport = mscclpp::Transport::CudaIpc | ibTransport; + + mscclpp::RegisteredMemory inputBufRegMem = communicator->registerMemory(inputBuff, inputBuffBytes, transport); + mscclpp::RegisteredMemory outputBufRegMem; + if (!isInPlace) { + outputBufRegMem = communicator->registerMemory(outputBuff, outputBuffBytes, transport); + } + + for (int r = 0; r < worldSize; r++) { + if (r == rank) { + continue; + } + std::shared_ptr conn; + if (rankToNode(r) == rankToNode(gEnv->rank)) { + conn = communicator->connectOnSetup(r, 0, mscclpp::Transport::CudaIpc); + } else { + conn = communicator->connectOnSetup(r, 0, ibTransport); + } + connections[r] = conn; + + if (isInPlace) { + communicator->sendMemoryOnSetup(inputBufRegMem, r, 0); + } else { + communicator->sendMemoryOnSetup(outputBufRegMem, r, 0); + } + auto remoteMemory = communicator->recvMemoryOnSetup(r, 0); + + communicator->setup(); + + directEpochs[r] = std::make_shared(*communicator, conn); + + communicator->setup(); + + dirChannels.emplace_back(directEpochs[r]->deviceHandle(), remoteMemory.get(), inputBufRegMem.data(), + (isInPlace ? nullptr : outputBufRegMem.data())); + } +} + +__constant__ mscclpp::channel::DirectChannel gChannelOneToOneTestConstDirChans; + +__global__ void kernelDirectPingPong(int* buff, int rank, int nElem, int* ret) { + mscclpp::channel::DirectChannel& dirChan = gChannelOneToOneTestConstDirChans; + volatile int* sendBuff = (volatile int*)buff; + int nTries = 1000; + int rank1Offset = 10000000; + for (int i = 0; i < nTries; i++) { + if (rank == 0) { + if (i > 0) { + if (threadIdx.x == 0) dirChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + if (sendBuff[j] != rank1Offset + i - 1 + j) { + // printf("rank 0 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], rank1Offset + i - 1 + j); + *ret = 1; + break; + } + } + } + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + sendBuff[j] = i + j; + } + __syncthreads(); + dirChan.put(0, 0, nElem * sizeof(int), threadIdx.x, blockDim.x); + if (threadIdx.x == 0) dirChan.signal(); + } + if (rank == 1) { + if (threadIdx.x == 0) dirChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + if (sendBuff[j] != i + j) { + // printf("rank 1 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], i + j); + *ret = 1; + break; + } + } + if (i < nTries - 1) { + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + sendBuff[j] = rank1Offset + i + j; + } + __syncthreads(); + dirChan.put(0, 0, nElem * sizeof(int), threadIdx.x, blockDim.x); + if (threadIdx.x == 0) dirChan.signal(); + } + } + } +} + +TEST_F(DirectChannelOneToOneTest, PingPong) { + if (gEnv->rank >= numRanksToUse) return; + + const int nElem = 4 * 1024 * 1024; + + std::vector dirChannels; + std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + setupMeshConnections(dirChannels, buff.get(), nElem * sizeof(int)); + + ASSERT_EQ(dirChannels.size(), 1); + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstDirChans, dirChannels.data(), + sizeof(mscclpp::channel::DirectChannel))); + + std::shared_ptr ret = mscclpp::makeSharedCudaHost(0); + + kernelDirectPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + *ret = 0; + + kernelDirectPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + *ret = 0; + + kernelDirectPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + *ret = 0; + + kernelDirectPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); +} + +__global__ void kernelDirectPacketPingPong(int* buff, int rank, int nElem, int* ret) { + if (rank > 1) return; + + mscclpp::channel::DirectChannel& dirChan = gChannelOneToOneTestConstDirChans; + volatile int* sendBuff = (volatile int*)buff; + int nTries = 1000; + int putOffset = (rank == 0) ? 0 : 10000000; + int getOffset = (rank == 0) ? 10000000 : 0; + for (int i = 0; i < nTries; i++) { + uint64_t flag = (uint64_t)i + 1; + + // rank=0: 0, 1, 0, 1, ... + // rank=1: 1, 0, 1, 0, ... + if ((rank ^ (i & 1)) == 0) { + // If each thread writes 8 bytes at once, we don't need a barrier before putPacket(). + for (int j = threadIdx.x; j < nElem / 2; j += blockDim.x) { + sendBuff[2 * j] = putOffset + i + 2 * j; + sendBuff[2 * j + 1] = putOffset + i + 2 * j + 1; + } + // __syncthreads(); + dirChan.putPacket(0, 0, nElem * sizeof(int), threadIdx.x, blockDim.x, flag); + } else { + dirChan.getPacket(0, 0, nElem * sizeof(int), threadIdx.x, blockDim.x, flag); + // If each thread reads 8 bytes at once, we don't need a barrier after getPacket(). + // __syncthreads(); + for (int j = threadIdx.x; j < nElem / 2; j += blockDim.x) { + if (sendBuff[2 * j] != getOffset + i + 2 * j) { + // printf("ERROR: rank = %d, sendBuff[%d] = %d, expected %d. Skipping following errors\n", rank, 2 * j, + // sendBuff[2 * j], getOffset + i + 2 * j); + *ret = 1; + break; + } + if (sendBuff[2 * j + 1] != getOffset + i + 2 * j + 1) { + // printf("ERROR: rank = %d, sendBuff[%d] = %d, expected %d. Skipping following errors\n", rank, 2 * j + 1, + // sendBuff[2 * j + 1], getOffset + i + 2 * j + 1); + *ret = 1; + break; + } + } + } + // Make sure all threads are done in this iteration + __syncthreads(); + } +} + +TEST_F(DirectChannelOneToOneTest, PacketPingPong) { + if (gEnv->rank >= numRanksToUse) return; + + const int nElem = 4 * 1024 * 1024; + + std::vector dirChannels; + std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + std::shared_ptr intermBuff = mscclpp::allocSharedCuda(nElem * 2); + setupMeshConnections(dirChannels, buff.get(), nElem * sizeof(int), intermBuff.get(), nElem * 2 * sizeof(int)); + + ASSERT_EQ(dirChannels.size(), 1); + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstDirChans, dirChannels.data(), + sizeof(mscclpp::channel::DirectChannel))); + + std::shared_ptr ret = mscclpp::makeSharedCudaHost(0); + + // The least nelem is 2 for packet ping pong + kernelDirectPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 2, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + *ret = 0; + + kernelDirectPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + *ret = 0; + + kernelDirectPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + *ret = 0; + + kernelDirectPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); +} diff --git a/test/mp_unit/ib_tests.cu b/test/mp_unit/ib_tests.cu new file mode 100644 index 00000000..74af8844 --- /dev/null +++ b/test/mp_unit/ib_tests.cu @@ -0,0 +1,287 @@ +#include + +#include + +#include "infiniband/verbs.h" +#include "mp_unit_tests.hpp" + +void IbTestBase::SetUp() { + MSCCLPP_CUDATHROW(cudaGetDeviceCount(&cudaDevNum)); + cudaDevId = (gEnv->rank % gEnv->nRanksPerNode) % cudaDevNum; + MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevId)); + + int ibDevId = (gEnv->rank % gEnv->nRanksPerNode) / mscclpp::getIBDeviceCount(); + ibDevName = mscclpp::getIBDeviceName(ibIdToTransport(ibDevId)); +} + +void IbPeerToPeerTest::SetUp() { + IbTestBase::SetUp(); + + mscclpp::UniqueId id; + + if (gEnv->rank < 2) { + // This test needs only two ranks + bootstrap = std::make_shared(gEnv->rank, 2); + if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); + } + MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); + if (gEnv->rank >= 2) { + // This test needs only two ranks + return; + } + + bootstrap->initialize(id); + + ibCtx = std::make_shared(ibDevName); + qp = ibCtx->createQp(); + + qpInfo[gEnv->rank] = qp->getInfo(); + bootstrap->allGather(qpInfo.data(), sizeof(mscclpp::IbQpInfo)); +} + +void IbPeerToPeerTest::registerBufferAndConnect(void* buf, size_t size) { + bufSize = size; + mr = ibCtx->registerMr(buf, size); + mrInfo[gEnv->rank] = mr->getInfo(); + bootstrap->allGather(mrInfo.data(), sizeof(mscclpp::IbMrInfo)); + + for (int i = 0; i < bootstrap->getNranks(); ++i) { + if (i == gEnv->rank) continue; + qp->rtr(qpInfo[i]); + qp->rts(); + break; + } + bootstrap->barrier(); +} + +void IbPeerToPeerTest::stageSend(uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, bool signaled) { + const mscclpp::IbMrInfo& remoteMrInfo = mrInfo[(gEnv->rank == 1) ? 0 : 1]; + qp->stageSend(mr, remoteMrInfo, size, wrId, srcOffset, dstOffset, signaled); +} + +void IbPeerToPeerTest::stageAtomicAdd(uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, uint64_t addVal) { + const mscclpp::IbMrInfo& remoteMrInfo = mrInfo[(gEnv->rank == 1) ? 0 : 1]; + qp->stageAtomicAdd(mr, remoteMrInfo, wrId, dstOffset, addVal); +} + +void IbPeerToPeerTest::stageSendWithImm(uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, + bool signaled, unsigned int immData) { + const mscclpp::IbMrInfo& remoteMrInfo = mrInfo[(gEnv->rank == 1) ? 0 : 1]; + qp->stageSendWithImm(mr, remoteMrInfo, size, wrId, srcOffset, dstOffset, signaled, immData); +} + +TEST_F(IbPeerToPeerTest, SimpleSendRecv) { + if (gEnv->rank >= 2) { + // This test needs only two ranks + return; + } + + mscclpp::Timer timeout(3); + + const int maxIter = 100000; + const int nelem = 1; + auto data = mscclpp::allocUniqueCuda(nelem); + + registerBufferAndConnect(data.get(), sizeof(int) * nelem); + + if (gEnv->rank == 1) { + mscclpp::Timer timer; + for (int iter = 0; iter < maxIter; ++iter) { + stageSend(sizeof(int) * nelem, 0, 0, 0, true); + qp->postSend(); + bool waiting = true; + int spin = 0; + while (waiting) { + int wcNum = qp->pollCq(); + ASSERT_GE(wcNum, 0); + for (int i = 0; i < wcNum; ++i) { + const ibv_wc* wc = qp->getWc(i); + EXPECT_EQ(wc->status, IBV_WC_SUCCESS); + waiting = false; + break; + } + if (spin++ > 1000000) { + FAIL() << "Polling is stuck."; + } + } + } + float us = (float)timer.elapsed(); + std::cout << "IbPeerToPeerTest.SimpleSendRecv: " << us / maxIter << " us/iter" << std::endl; + } + bootstrap->barrier(); +} + +__global__ void kernelMemoryConsistency(uint64_t* data, volatile uint64_t* curIter, volatile int* result, + uint64_t nelem, uint64_t maxIter) { + if (blockIdx.x != 0) return; + + constexpr int FlagWrong = 1; + constexpr int FlagAbort = 2; + + volatile uint64_t* ptr = data; + for (uint64_t iter = 1; iter < maxIter + 1; ++iter) { + int err = 0; + + if (threadIdx.x == 0) { + *curIter = iter; + + // Wait for the first element arrival (expect equal to iter). Expect that the first element is delivered in + // a special way that guarantees all other elements are completely delivered. + uint64_t spin = 0; + while (ptr[0] != iter) { + if (spin++ == 1000000) { + // Assume the program is stuck. Set the abort flag and escape the loop. + *result |= FlagAbort; + err = 1; + break; + } + } + } + __syncthreads(); + + // Check results (expect equal to iter) in backward that is more likely to see the wrong result. + for (size_t i = nelem - 1 + threadIdx.x; i >= blockDim.x; i -= blockDim.x) { + if (data[i - blockDim.x] != iter) { +#if 1 + *result |= FlagWrong; + err = 1; + break; +#else + // For debugging purposes: try waiting for the correct result. + uint64_t spin = 0; + while (ptr[i - blockDim.x] != iter) { + if (spin++ == 1000000) { + *result |= FlagAbort; + err = 1; + break; + } + } + if (spin >= 1000000) { + break; + } +#endif + } + } + __threadfence(); + __syncthreads(); + + // Shuffle err + for (int i = 16; i > 0; i /= 2) { + err += __shfl_xor_sync(0xffffffff, err, i); + } + + if (err > 0) { + // Exit if any error is detected. + return; + } + } + if (threadIdx.x == 0) { + *curIter = maxIter + 1; + } +} + +TEST_F(IbPeerToPeerTest, MemoryConsistency) { + if (gEnv->rank >= 2) { + // This test needs only two ranks + return; + } + + const uint64_t signalPeriod = 1024; + const uint64_t maxIter = 10000; + const uint64_t nelem = 65536 + 1; + auto data = mscclpp::allocUniqueCuda(nelem); + + registerBufferAndConnect(data.get(), sizeof(uint64_t) * nelem); + + uint64_t res = 0; + uint64_t iter = 0; + + if (gEnv->rank == 0) { + // Receiver + auto curIter = mscclpp::makeUniqueCudaHost(0); + auto result = mscclpp::makeUniqueCudaHost(0); + + volatile uint64_t* ptrCurIter = (volatile uint64_t*)curIter.get(); + volatile int* ptrResult = (volatile int*)result.get(); + + ASSERT_EQ(*ptrCurIter, 0); + ASSERT_EQ(*ptrResult, 0); + + kernelMemoryConsistency<<<1, 1024>>>(data.get(), ptrCurIter, ptrResult, nelem, maxIter); + MSCCLPP_CUDATHROW(cudaGetLastError()); + + for (iter = 1; iter < maxIter + 1; ++iter) { + mscclpp::Timer timeout(5); + + while (*ptrCurIter != iter + 1) { + res = *ptrResult; + if (res != 0) break; + } + + // Send the result to the sender + res = *ptrResult; + uint64_t tmp[2]; + tmp[0] = res; + bootstrap->allGather(tmp, sizeof(uint64_t)); + + if (res != 0) break; + } + + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + } else if (gEnv->rank == 1) { + // Sender + std::vector hostBuffer(nelem, 0); + + for (iter = 1; iter < maxIter + 1; ++iter) { + mscclpp::Timer timeout(5); + + // Set data + for (uint64_t i = 0; i < nelem; i++) { + hostBuffer[i] = iter; + } + mscclpp::memcpyCuda(data.get(), hostBuffer.data(), nelem, cudaMemcpyHostToDevice); + + // Need to signal from time to time to empty the IB send queue + bool signaled = (iter % signalPeriod == 0); + + // Send from the second element to the last + stageSend(sizeof(uint64_t) * (nelem - 1), 0, sizeof(uint64_t), sizeof(uint64_t), signaled); + qp->postSend(); + +#if 0 + // Send the first element using a normal send. This should occasionally see the wrong result. + stageSend(sizeof(uint64_t), 0, 0, 0, false); + qp->postSend(); +#else + // For reference: send the first element using AtomicAdd. This should see the correct result. + stageAtomicAdd(0, 0, 0, 1); + qp->postSend(); +#endif + + if (signaled) { + int wcNum = qp->pollCq(); + while (wcNum == 0) { + wcNum = qp->pollCq(); + } + ASSERT_EQ(wcNum, 1); + const ibv_wc* wc = qp->getWc(0); + ASSERT_EQ(wc->status, IBV_WC_SUCCESS); + } + + // Get the result from the receiver + uint64_t tmp[2]; + bootstrap->allGather(tmp, sizeof(uint64_t)); + res = tmp[0]; + + if (res != 0) break; + } + } + + if (res & 2) { + FAIL() << "The receiver is stuck at iteration " << iter << "."; + } else if (res != 0 && res != 1) { + FAIL() << "Unknown error is detected at iteration " << iter << ". res =" << res; + } + + EXPECT_EQ(res, 0); +} diff --git a/test/mp_unit/mp_unit_tests.cc b/test/mp_unit/mp_unit_tests.cc new file mode 100644 index 00000000..cde8439f --- /dev/null +++ b/test/mp_unit/mp_unit_tests.cc @@ -0,0 +1,83 @@ +#include "mp_unit_tests.hpp" + +#include + +#include + +const char gDefaultIpPort[] = "127.0.0.1:50053"; +MultiProcessTestEnv* gEnv = nullptr; + +mscclpp::Transport ibIdToTransport(int id) { + mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2, + mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5, + mscclpp::Transport::IB6, mscclpp::Transport::IB7}; + return IBs[id]; +} + +MultiProcessTestEnv::MultiProcessTestEnv(int argc, const char** argv) : argc(argc), argv(argv) {} + +static std::unordered_map parseArgs(int argc, const char* argv[]) { + auto printUsage = [](const char* prog) { + std::stringstream ss; + ss << "Usage: " << prog << " [-ip_port IP:PORT]\n"; + std::cout << ss.str(); + }; + + std::unordered_map options; + + // Default values + options["ip_port"] = gDefaultIpPort; + + // Parse the command line arguments + for (int i = 1; i < argc; i++) { + std::string arg = argv[i]; + if (arg == "-ip_port") { + if (i + 1 < argc) { + options["ip_port"] = argv[++i]; + } else { + throw std::invalid_argument("Error: -ip_port option requires an argument.\n"); + } + } else if (arg == "-help" || arg == "-h") { + printUsage(argv[0]); + exit(0); + } else { + throw std::invalid_argument("Error: Unknown option " + std::string(argv[i]) + "\n"); + } + } + return options; +} + +void MultiProcessTestEnv::SetUp() { + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &worldSize); + // get the local number of nodes with MPI + MPI_Comm shmcomm; + MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); + int shmrank; + MPI_Comm_size(shmcomm, &shmrank); + nRanksPerNode = shmrank; + MPI_Comm_free(&shmcomm); + + // parse the command line arguments + args = parseArgs(argc, argv); +} + +void MultiProcessTestEnv::TearDown() { MPI_Finalize(); } + +void MultiProcessTest::TearDown() { + // Wait for all ranks to finish the previous test + MPI_Barrier(MPI_COMM_WORLD); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + gEnv = new MultiProcessTestEnv(argc, (const char**)argv); + ::testing::AddGlobalTestEnvironment(gEnv); + return RUN_ALL_TESTS(); +} + +TEST_F(MultiProcessTest, Prelim) { + // Test to make sure the MPI environment is set up correctly + ASSERT_GE(gEnv->worldSize, 2); +} diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp new file mode 100644 index 00000000..b8dc8d7c --- /dev/null +++ b/test/mp_unit/mp_unit_tests.hpp @@ -0,0 +1,144 @@ +#ifndef MSCCLPP_MP_UNIT_TESTS_HPP_ +#define MSCCLPP_MP_UNIT_TESTS_HPP_ + +#include + +#include +#include +#include + +#include "ib.hpp" + +class MultiProcessTestEnv : public ::testing::Environment { + public: + MultiProcessTestEnv(int argc, const char** argv); + + void SetUp(); + void TearDown(); + + const int argc; + const char** argv; + int rank; + int worldSize; + int nRanksPerNode; + std::unordered_map args; +}; + +extern MultiProcessTestEnv* gEnv; + +class MultiProcessTest : public ::testing::Test { + protected: + void TearDown() override; +}; + +class BootstrapTest : public MultiProcessTest { + protected: + void bootstrapTestAllGather(std::shared_ptr bootstrap); + + void bootstrapTestBarrier(std::shared_ptr bootstrap); + + void bootstrapTestSendRecv(std::shared_ptr bootstrap); + + void bootstrapTestAll(std::shared_ptr bootstrap); + + // Each test case should finish within 30 seconds. + mscclpp::Timer bootstrapTestTimer{30}; +}; + +class IbTestBase : public MultiProcessTest { + protected: + void SetUp() override; + + int cudaDevNum; + int cudaDevId; + std::string ibDevName; +}; + +class IbPeerToPeerTest : public IbTestBase { + protected: + void SetUp() override; + + void registerBufferAndConnect(void* buf, size_t size); + + void stageSend(uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, bool signaled); + + void stageAtomicAdd(uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, uint64_t addVal); + + void stageSendWithImm(uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, bool signaled, + unsigned int immData); + + std::shared_ptr bootstrap; + std::shared_ptr ibCtx; + mscclpp::IbQp* qp; + const mscclpp::IbMr* mr; + size_t bufSize; + + std::array qpInfo; + std::array mrInfo; +}; + +mscclpp::Transport ibIdToTransport(int id); + +class CommunicatorTestBase : public MultiProcessTest { + protected: + void SetUp() override; + void TearDown() override; + + void setNumRanksToUse(int num); + int rankToLocalRank(int rank) const; + int rankToNode(int rank) const; + void connectMesh(bool useIbOnly = false); + + // Register a local memory and receive corresponding remote memories + void registerMemoryPairs(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, + const std::vector& remoteRanks, mscclpp::RegisteredMemory& localMemory, + std::unordered_map& remoteMemories); + // Register a local memory an receive one corresponding remote memory + void registerMemoryPair(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, int remoteRank, + mscclpp::RegisteredMemory& localMemory, mscclpp::RegisteredMemory& remoteMemory); + + int numRanksToUse = -1; + std::shared_ptr communicator; + mscclpp::Transport ibTransport; + std::unordered_map> connections; +}; + +class CommunicatorTest : public CommunicatorTestBase { + protected: + void SetUp() override; + void TearDown() override; + + void deviceBufferInit(); + void writeToRemote(int dataCountPerRank); + bool testWriteCorrectness(bool skipLocal = false); + + const size_t numBuffers = 10; + const int deviceBufferSize = 1024 * 1024; + std::vector> devicePtr; + std::vector localMemory; + std::vector> remoteMemory; +}; + +class DeviceChannelOneToOneTest : public CommunicatorTestBase { + protected: + void SetUp() override; + void TearDown() override; + + void setupMeshConnections(std::vector& devChannels, bool useIbOnly, + void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0); + + std::shared_ptr channelService; +}; + +class DirectChannelOneToOneTest : public CommunicatorTestBase { + protected: + void SetUp() override; + void TearDown() override; + + void setupMeshConnections(std::vector& dirChannels, void* inputBuff, + size_t inputBuffBytes, void* outputBuff = nullptr, size_t outputBuffBytes = 0); + + std::unordered_map> directEpochs; +}; + +#endif // MSCCLPP_MP_UNIT_TESTS_HPP_ diff --git a/test/mp_unit_tests.cu b/test/mp_unit_tests.cu deleted file mode 100644 index 29a48fc2..00000000 --- a/test/mp_unit_tests.cu +++ /dev/null @@ -1,976 +0,0 @@ -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include "config.hpp" -#include "ib.hpp" -#include "infiniband/verbs.h" - -static const char gDefaultIpPort[] = "127.0.0.1:50053"; - -class MultiProcessTestEnv : public ::testing::Environment { - public: - MultiProcessTestEnv(int argc, const char** argv) : argc(argc), argv(argv) {} - - // Override this to define how to set up the environment. - void SetUp() { - MPI_Init(NULL, NULL); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &worldSize); - // get the local number of nodes with MPI - MPI_Comm shmcomm; - MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); - int shmrank; - MPI_Comm_size(shmcomm, &shmrank); - nRanksPerNode = shmrank; - MPI_Comm_free(&shmcomm); - - // parse the command line arguments - args = parseArgs(argc, argv); - } - - // Override this to define how to tear down the environment. - void TearDown() { MPI_Finalize(); } - - static std::unordered_map parseArgs(int argc, const char* argv[]) { - auto printUsage = [](const char* prog) { - std::stringstream ss; - ss << "Usage: " << prog << " [-ip_port IP:PORT]\n"; - std::cout << ss.str(); - }; - - std::unordered_map options; - - // Default values - options["ip_port"] = gDefaultIpPort; - - // Parse the command line arguments - for (int i = 1; i < argc; i++) { - std::string arg = argv[i]; - if (arg == "-ip_port") { - if (i + 1 < argc) { - options["ip_port"] = argv[++i]; - } else { - throw std::invalid_argument("Error: -ip_port option requires an argument.\n"); - } - } else if (arg == "-help" || arg == "-h") { - printUsage(argv[0]); - exit(0); - } else { - throw std::invalid_argument("Error: Unknown option " + std::string(argv[i]) + "\n"); - } - } - return options; - } - - const int argc; - const char** argv; - int rank; - int worldSize; - int nRanksPerNode; - std::unordered_map args; -}; - -MultiProcessTestEnv* gEnv = nullptr; - -class MultiProcessTest : public ::testing::Test { - protected: - void TearDown() override { - // Wait for all ranks to finish the previous test - MPI_Barrier(MPI_COMM_WORLD); - } -}; - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - gEnv = new MultiProcessTestEnv(argc, (const char**)argv); - ::testing::AddGlobalTestEnvironment(gEnv); - return RUN_ALL_TESTS(); -} - -TEST_F(MultiProcessTest, Prelim) { - // Test to make sure the MPI environment is set up correctly - ASSERT_GE(gEnv->worldSize, 2); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Bootstrap tests -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -class BootstrapTest : public MultiProcessTest { - protected: - // Each test case should finish within 30 seconds. - mscclpp::Timer bootstrapTestTimer{30}; -}; - -void bootstrapTestAllGather(std::shared_ptr bootstrap) { - std::vector tmp(bootstrap->getNranks(), 0); - tmp[bootstrap->getRank()] = bootstrap->getRank() + 1; - bootstrap->allGather(tmp.data(), sizeof(int)); - for (int i = 0; i < bootstrap->getNranks(); ++i) { - EXPECT_EQ(tmp[i], i + 1); - } -} - -void bootstrapTestBarrier(std::shared_ptr bootstrap) { bootstrap->barrier(); } - -void bootstrapTestSendRecv(std::shared_ptr bootstrap) { - for (int i = 0; i < bootstrap->getNranks(); i++) { - if (bootstrap->getRank() == i) continue; - int msg1 = (bootstrap->getRank() + 1) * 3; - int msg2 = (bootstrap->getRank() + 1) * 3 + 1; - int msg3 = (bootstrap->getRank() + 1) * 3 + 2; - bootstrap->send(&msg1, sizeof(int), i, 0); - bootstrap->send(&msg2, sizeof(int), i, 1); - bootstrap->send(&msg3, sizeof(int), i, 2); - } - - for (int i = 0; i < bootstrap->getNranks(); i++) { - if (bootstrap->getRank() == i) continue; - int msg1 = 0; - int msg2 = 0; - int msg3 = 0; - // recv them in the opposite order to check correctness - bootstrap->recv(&msg2, sizeof(int), i, 1); - bootstrap->recv(&msg3, sizeof(int), i, 2); - bootstrap->recv(&msg1, sizeof(int), i, 0); - EXPECT_EQ(msg1, (i + 1) * 3); - EXPECT_EQ(msg2, (i + 1) * 3 + 1); - EXPECT_EQ(msg3, (i + 1) * 3 + 2); - } -} - -void bootstrapTestAll(std::shared_ptr bootstrap) { - bootstrapTestAllGather(bootstrap); - bootstrapTestBarrier(bootstrap); - bootstrapTestSendRecv(bootstrap); -} - -TEST_F(BootstrapTest, WithId) { - auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); - mscclpp::UniqueId id; - if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); - MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); - bootstrap->initialize(id); - bootstrapTestAll(bootstrap); -} - -TEST_F(BootstrapTest, WithIpPortPair) { - auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); - bootstrap->initialize(gEnv->args["ip_port"]); - bootstrapTestAll(bootstrap); -} - -TEST_F(BootstrapTest, ResumeWithId) { - for (int i = 0; i < 5; ++i) { - auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); - mscclpp::UniqueId id; - if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); - MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); - bootstrap->initialize(id); - } -} - -TEST_F(BootstrapTest, ResumeWithIpPortPair) { - for (int i = 0; i < 5; ++i) { - auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); - bootstrap->initialize(gEnv->args["ip_port"]); - } -} - -TEST_F(BootstrapTest, ExitBeforeConnect) { - auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); - mscclpp::UniqueId id = bootstrap->createUniqueId(); -} - -TEST_F(BootstrapTest, TimeoutWithId) { - // Set bootstrap timeout to 1 second - mscclpp::Config* cfg = mscclpp::Config::getInstance(); - cfg->setBootstrapConnectionTimeoutConfig(1); - - mscclpp::Timer timer; - - // All ranks initialize a bootstrap with their own id (will hang) - auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); - mscclpp::UniqueId id = bootstrap->createUniqueId(); - - try { - bootstrap->initialize(id); - } catch (const mscclpp::Error& e) { - ASSERT_EQ(e.getErrorCode(), mscclpp::ErrorCode::Timeout); - } - - // Timeout should be sligtly greater than 1 second - ASSERT_GT(timer.elapsed(), 1000000); - ASSERT_LT(timer.elapsed(), 1100000); -} - -class MPIBootstrap : public mscclpp::BaseBootstrap { - public: - MPIBootstrap() : BaseBootstrap() {} - int getRank() override { - int rank; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - return rank; - } - int getNranks() override { - int worldSize; - MPI_Comm_size(MPI_COMM_WORLD, &worldSize); - return worldSize; - } - void allGather(void* sendbuf, int size) override { - MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sendbuf, size, MPI_BYTE, MPI_COMM_WORLD); - } - void barrier() override { MPI_Barrier(MPI_COMM_WORLD); } - void send(void* sendbuf, int size, int dest, int tag) override { - MPI_Send(sendbuf, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD); - } - void recv(void* recvbuf, int size, int source, int tag) override { - MPI_Recv(recvbuf, size, MPI_BYTE, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - } -}; - -TEST_F(BootstrapTest, MPIBootstrap) { - auto bootstrap = std::make_shared(); - bootstrapTestAll(bootstrap); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// InfiniBand tests -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -static mscclpp::Transport ibIdToTransport(int id) { - mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2, - mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5, - mscclpp::Transport::IB6, mscclpp::Transport::IB7}; - return IBs[id]; -} - -class IbTestBase : public MultiProcessTest { - protected: - void SetUp() override { - MSCCLPP_CUDATHROW(cudaGetDeviceCount(&cudaDevNum)); - cudaDevId = (gEnv->rank % gEnv->nRanksPerNode) % cudaDevNum; - MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevId)); - - int ibDevId = (gEnv->rank % gEnv->nRanksPerNode) / mscclpp::getIBDeviceCount(); - ibDevName = mscclpp::getIBDeviceName(ibIdToTransport(ibDevId)); - } - - int cudaDevNum; - int cudaDevId; - std::string ibDevName; -}; - -class IbPeerToPeerTest : public IbTestBase { - protected: - void SetUp() override { - IbTestBase::SetUp(); - - mscclpp::UniqueId id; - - if (gEnv->rank < 2) { - // This test needs only two ranks - bootstrap = std::make_shared(gEnv->rank, 2); - if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); - } - MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); - if (gEnv->rank >= 2) { - // This test needs only two ranks - return; - } - - bootstrap->initialize(id); - - ibCtx = std::make_shared(ibDevName); - qp = ibCtx->createQp(); - - qpInfo[gEnv->rank] = qp->getInfo(); - bootstrap->allGather(qpInfo.data(), sizeof(mscclpp::IbQpInfo)); - } - - void registerBufferAndConnect(void* buf, size_t size) { - bufSize = size; - mr = ibCtx->registerMr(buf, size); - mrInfo[gEnv->rank] = mr->getInfo(); - bootstrap->allGather(mrInfo.data(), sizeof(mscclpp::IbMrInfo)); - - for (int i = 0; i < bootstrap->getNranks(); ++i) { - if (i == gEnv->rank) continue; - qp->rtr(qpInfo[i]); - qp->rts(); - break; - } - bootstrap->barrier(); - } - - void stageSend(uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, bool signaled) { - const mscclpp::IbMrInfo& remoteMrInfo = mrInfo[(gEnv->rank == 1) ? 0 : 1]; - qp->stageSend(mr, remoteMrInfo, size, wrId, srcOffset, dstOffset, signaled); - } - - void stageAtomicAdd(uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, uint64_t addVal) { - const mscclpp::IbMrInfo& remoteMrInfo = mrInfo[(gEnv->rank == 1) ? 0 : 1]; - qp->stageAtomicAdd(mr, remoteMrInfo, wrId, dstOffset, addVal); - } - - void stageSendWithImm(uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, bool signaled, - unsigned int immData) { - const mscclpp::IbMrInfo& remoteMrInfo = mrInfo[(gEnv->rank == 1) ? 0 : 1]; - qp->stageSendWithImm(mr, remoteMrInfo, size, wrId, srcOffset, dstOffset, signaled, immData); - } - - std::shared_ptr bootstrap; - std::shared_ptr ibCtx; - mscclpp::IbQp* qp; - const mscclpp::IbMr* mr; - size_t bufSize; - - std::array qpInfo; - std::array mrInfo; -}; - -TEST_F(IbPeerToPeerTest, SimpleSendRecv) { - if (gEnv->rank >= 2) { - // This test needs only two ranks - return; - } - - mscclpp::Timer timeout(3); - - const int maxIter = 100000; - const int nelem = 1; - auto data = mscclpp::allocUniqueCuda(nelem); - - registerBufferAndConnect(data.get(), sizeof(int) * nelem); - - if (gEnv->rank == 1) { - mscclpp::Timer timer; - for (int iter = 0; iter < maxIter; ++iter) { - stageSend(sizeof(int) * nelem, 0, 0, 0, true); - qp->postSend(); - bool waiting = true; - int spin = 0; - while (waiting) { - int wcNum = qp->pollCq(); - ASSERT_GE(wcNum, 0); - for (int i = 0; i < wcNum; ++i) { - const ibv_wc* wc = qp->getWc(i); - EXPECT_EQ(wc->status, IBV_WC_SUCCESS); - waiting = false; - break; - } - if (spin++ > 1000000) { - FAIL() << "Polling is stuck."; - } - } - } - float us = (float)timer.elapsed(); - std::cout << "IbPeerToPeerTest.SimpleSendRecv: " << us / maxIter << " us/iter" << std::endl; - } - bootstrap->barrier(); -} - -__global__ void kernelMemoryConsistency(uint64_t* data, volatile uint64_t* curIter, volatile int* result, - uint64_t nelem, uint64_t maxIter) { - if (blockIdx.x != 0) return; - - constexpr int FlagWrong = 1; - constexpr int FlagAbort = 2; - - volatile uint64_t* ptr = data; - for (uint64_t iter = 1; iter < maxIter + 1; ++iter) { - int err = 0; - - if (threadIdx.x == 0) { - *curIter = iter; - - // Wait for the first element arrival (expect equal to iter). Expect that the first element is delivered in - // a special way that guarantees all other elements are completely delivered. - uint64_t spin = 0; - while (ptr[0] != iter) { - if (spin++ == 1000000) { - // Assume the program is stuck. Set the abort flag and escape the loop. - *result |= FlagAbort; - err = 1; - break; - } - } - } - __syncthreads(); - - // Check results (expect equal to iter) in backward that is more likely to see the wrong result. - for (size_t i = nelem - 1 + threadIdx.x; i >= blockDim.x; i -= blockDim.x) { - if (data[i - blockDim.x] != iter) { -#if 1 - *result |= FlagWrong; - err = 1; - break; -#else - // For debugging purposes: try waiting for the correct result. - uint64_t spin = 0; - while (ptr[i - blockDim.x] != iter) { - if (spin++ == 1000000) { - *result |= FlagAbort; - err = 1; - break; - } - } - if (spin >= 1000000) { - break; - } -#endif - } - } - __threadfence(); - __syncthreads(); - - // Shuffle err - for (int i = 16; i > 0; i /= 2) { - err += __shfl_xor_sync(0xffffffff, err, i); - } - - if (err > 0) { - // Exit if any error is detected. - return; - } - } - if (threadIdx.x == 0) { - *curIter = maxIter + 1; - } -} - -TEST_F(IbPeerToPeerTest, MemoryConsistency) { - if (gEnv->rank >= 2) { - // This test needs only two ranks - return; - } - - const uint64_t signalPeriod = 1024; - const uint64_t maxIter = 10000; - const uint64_t nelem = 65536 + 1; - auto data = mscclpp::allocUniqueCuda(nelem); - - registerBufferAndConnect(data.get(), sizeof(uint64_t) * nelem); - - uint64_t res = 0; - uint64_t iter = 0; - - if (gEnv->rank == 0) { - // Receiver - auto curIter = mscclpp::makeUniqueCudaHost(0); - auto result = mscclpp::makeUniqueCudaHost(0); - - volatile uint64_t* ptrCurIter = (volatile uint64_t*)curIter.get(); - volatile int* ptrResult = (volatile int*)result.get(); - - ASSERT_EQ(*ptrCurIter, 0); - ASSERT_EQ(*ptrResult, 0); - - kernelMemoryConsistency<<<1, 1024>>>(data.get(), ptrCurIter, ptrResult, nelem, maxIter); - MSCCLPP_CUDATHROW(cudaGetLastError()); - - for (iter = 1; iter < maxIter + 1; ++iter) { - mscclpp::Timer timeout(5); - - while (*ptrCurIter != iter + 1) { - res = *ptrResult; - if (res != 0) break; - } - - // Send the result to the sender - res = *ptrResult; - uint64_t tmp[2]; - tmp[0] = res; - bootstrap->allGather(tmp, sizeof(uint64_t)); - - if (res != 0) break; - } - - MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); - } else if (gEnv->rank == 1) { - // Sender - std::vector hostBuffer(nelem, 0); - - for (iter = 1; iter < maxIter + 1; ++iter) { - mscclpp::Timer timeout(5); - - // Set data - for (uint64_t i = 0; i < nelem; i++) { - hostBuffer[i] = iter; - } - mscclpp::memcpyCuda(data.get(), hostBuffer.data(), nelem, cudaMemcpyHostToDevice); - - // Need to signal from time to time to empty the IB send queue - bool signaled = (iter % signalPeriod == 0); - - // Send from the second element to the last - stageSend(sizeof(uint64_t) * (nelem - 1), 0, sizeof(uint64_t), sizeof(uint64_t), signaled); - qp->postSend(); - -#if 0 - // Send the first element using a normal send. This should occasionally see the wrong result. - stageSend(sizeof(uint64_t), 0, 0, 0, false); - qp->postSend(); -#else - // For reference: send the first element using AtomicAdd. This should see the correct result. - stageAtomicAdd(0, 0, 0, 1); - qp->postSend(); -#endif - - if (signaled) { - int wcNum = qp->pollCq(); - while (wcNum == 0) { - wcNum = qp->pollCq(); - } - ASSERT_EQ(wcNum, 1); - const ibv_wc* wc = qp->getWc(0); - ASSERT_EQ(wc->status, IBV_WC_SUCCESS); - } - - // Get the result from the receiver - uint64_t tmp[2]; - bootstrap->allGather(tmp, sizeof(uint64_t)); - res = tmp[0]; - - if (res != 0) break; - } - } - - if (res & 2) { - FAIL() << "The receiver is stuck at iteration " << iter << "."; - } else if (res != 0 && res != 1) { - FAIL() << "Unknown error is detected at iteration " << iter << ". res =" << res; - } - - EXPECT_EQ(res, 0); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Communicator tests -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -class CommunicatorTestBase : public MultiProcessTest { - protected: - void SetUp() override { - MultiProcessTest::SetUp(); - - if (numRanksToUse == -1) { - numRanksToUse = gEnv->worldSize; - } - ASSERT_LE(numRanksToUse, gEnv->worldSize); - - std::shared_ptr bootstrap; - mscclpp::UniqueId id; - if (gEnv->rank < numRanksToUse) { - bootstrap = std::make_shared(gEnv->rank, numRanksToUse); - if (gEnv->rank == 0) id = bootstrap->createUniqueId(); - } - MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); - - if (gEnv->rank >= numRanksToUse) { - return; - } - bootstrap->initialize(id); - communicator = std::make_shared(bootstrap); - ibTransport = ibIdToTransport(rankToLocalRank(gEnv->rank)); - } - - void TearDown() override { - connections.clear(); - communicator.reset(); - MultiProcessTest::TearDown(); - } - - void setNumRanksToUse(int num) { numRanksToUse = num; } - - int rankToLocalRank(int rank) const { return rank % gEnv->nRanksPerNode; } - - int rankToNode(int rank) const { return rank / gEnv->nRanksPerNode; } - - void connectMesh(bool useIbOnly = false) { - for (int i = 0; i < numRanksToUse; i++) { - if (i != gEnv->rank) { - if ((rankToNode(i) == rankToNode(gEnv->rank)) && !useIbOnly) { - connections[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::CudaIpc); - } else { - connections[i] = communicator->connectOnSetup(i, 0, ibTransport); - } - } - } - communicator->setup(); - } - - // Register a local memory and receive corresponding remote memories - void registerMemoryPairs(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, - const std::vector& remoteRanks, mscclpp::RegisteredMemory& localMemory, - std::unordered_map& remoteMemories) { - localMemory = communicator->registerMemory(buff, buffSize, transport); - std::unordered_map> futureRemoteMemories; - for (int remoteRank : remoteRanks) { - if (remoteRank != communicator->bootstrapper()->getRank()) { - communicator->sendMemoryOnSetup(localMemory, remoteRank, tag); - futureRemoteMemories[remoteRank] = communicator->recvMemoryOnSetup(remoteRank, tag); - } - } - communicator->setup(); - for (int remoteRank : remoteRanks) { - if (remoteRank != communicator->bootstrapper()->getRank()) { - remoteMemories[remoteRank] = futureRemoteMemories[remoteRank].get(); - } - } - } - - // Register a local memory an receive one corresponding remote memory - void registerMemoryPair(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, int remoteRank, - mscclpp::RegisteredMemory& localMemory, mscclpp::RegisteredMemory& remoteMemory) { - std::vector remoteRanks = {remoteRank}; - std::unordered_map remoteMemories; - registerMemoryPairs(buff, buffSize, transport, tag, remoteRanks, localMemory, remoteMemories); - remoteMemory = remoteMemories[remoteRank]; - } - - int numRanksToUse = -1; - std::shared_ptr communicator; - mscclpp::Transport ibTransport; - std::unordered_map> connections; -}; - -class CommunicatorTest : public CommunicatorTestBase { - protected: - void SetUp() override { - CommunicatorTestBase::SetUp(); - - ASSERT_EQ((deviceBufferSize / sizeof(int)) % gEnv->worldSize, 0); - - connectMesh(); - - devicePtr.resize(numBuffers); - localMemory.resize(numBuffers); - remoteMemory.resize(numBuffers); - - std::vector remoteRanks; - for (int i = 0; i < gEnv->worldSize; i++) { - if (i != gEnv->rank) { - remoteRanks.push_back(i); - } - } - - for (int n = 0; n < numBuffers; n++) { - devicePtr[n] = mscclpp::allocSharedCuda(deviceBufferSize / sizeof(int)); - registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, - remoteRanks, localMemory[n], remoteMemory[n]); - } - } - - void TearDown() override { - remoteMemory.clear(); - localMemory.clear(); - devicePtr.clear(); - CommunicatorTestBase::TearDown(); - } - - void deviceBufferInit() { - size_t dataCount = deviceBufferSize / sizeof(int); - for (int n = 0; n < (int)devicePtr.size(); n++) { - std::vector hostBuffer(dataCount, 0); - for (int i = 0; i < dataCount; i++) { - hostBuffer[i] = gEnv->rank + n * gEnv->worldSize; - } - mscclpp::memcpyCuda(devicePtr[n].get(), hostBuffer.data(), dataCount, cudaMemcpyHostToDevice); - } - } - - void writeToRemote(int dataCountPerRank) { - for (int n = 0; n < numBuffers; n++) { - for (int i = 0; i < gEnv->worldSize; i++) { - if (i != gEnv->rank) { - auto& conn = connections.at(i); - auto& peerMemory = remoteMemory[n].at(i); - conn->write(peerMemory, gEnv->rank * dataCountPerRank * sizeof(int), localMemory[n], - gEnv->rank * dataCountPerRank * sizeof(int), dataCountPerRank * sizeof(int)); - conn->flush(); - } - } - } - } - - bool testWriteCorrectness(bool skipLocal = false) { - size_t dataCount = deviceBufferSize / sizeof(int); - for (int n = 0; n < (int)devicePtr.size(); n++) { - std::vector hostBuffer(dataCount, 0); - mscclpp::memcpyCuda(hostBuffer.data(), devicePtr[n].get(), dataCount, cudaMemcpyDeviceToHost); - for (int i = 0; i < gEnv->worldSize; i++) { - if (((i / gEnv->nRanksPerNode) == (gEnv->rank / gEnv->nRanksPerNode)) && skipLocal) { - continue; - } - for (int j = i * dataCount / gEnv->worldSize; j < (i + 1) * dataCount / gEnv->worldSize; j++) { - if (hostBuffer[j] != i + n * gEnv->worldSize) { - return false; - } - } - } - } - return true; - } - - const size_t numBuffers = 10; - const int deviceBufferSize = 1024 * 1024; - std::vector> devicePtr; - std::vector localMemory; - std::vector> remoteMemory; -}; - -TEST_F(CommunicatorTest, BasicWrite) { - if (gEnv->rank >= numRanksToUse) return; - - deviceBufferInit(); - communicator->bootstrapper()->barrier(); - - writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); - communicator->bootstrapper()->barrier(); - - // polling until it becomes ready - bool ready = false; - int niter = 0; - do { - ready = testWriteCorrectness(); - niter++; - if (niter == 10000) { - FAIL() << "Polling is stuck."; - } - } while (!ready); - communicator->bootstrapper()->barrier(); -} - -__global__ void kernelWaitEpochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) { - int tid = threadIdx.x; - if (tid != rank && tid < worldSize) { - deviceEpochs[tid].wait(); - } -} - -TEST_F(CommunicatorTest, WriteWithDeviceEpochs) { - if (gEnv->rank >= numRanksToUse) return; - - std::unordered_map> epochs; - for (auto entry : connections) { - auto& conn = entry.second; - epochs.insert({entry.first, std::make_shared(*communicator.get(), conn)}); - } - communicator->setup(); - communicator->bootstrapper()->barrier(); - - deviceBufferInit(); - communicator->bootstrapper()->barrier(); - - auto deviceEpochHandles = mscclpp::allocSharedCuda(gEnv->worldSize); - for (int i = 0; i < gEnv->worldSize; i++) { - if (i != gEnv->rank) { - mscclpp::DeviceEpoch::DeviceHandle deviceHandle = epochs[i]->deviceHandle(); - mscclpp::memcpyCuda(deviceEpochHandles.get() + i, &deviceHandle, 1, - cudaMemcpyHostToDevice); - } - } - communicator->bootstrapper()->barrier(); - - writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); - - for (int i = 0; i < gEnv->worldSize; i++) { - if (i != gEnv->rank) { - epochs[i]->signal(); - } - } - - kernelWaitEpochs<<<1, gEnv->worldSize>>>(deviceEpochHandles.get(), gEnv->rank, gEnv->worldSize); - MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); - - ASSERT_TRUE(testWriteCorrectness()); - communicator->bootstrapper()->barrier(); -} - -TEST_F(CommunicatorTest, WriteWithHostEpochs) { - if (gEnv->rank >= numRanksToUse) return; - - std::unordered_map> epochs; - for (auto entry : connections) { - auto& conn = entry.second; - // HostEpoch cannot be used with CudaIpc transport - if (conn->transport() == mscclpp::Transport::CudaIpc) continue; - epochs.insert({entry.first, std::make_shared(*communicator.get(), conn)}); - } - communicator->setup(); - communicator->bootstrapper()->barrier(); - - deviceBufferInit(); - communicator->bootstrapper()->barrier(); - - writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); - - for (int i = 0; i < gEnv->worldSize; i++) { - if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { - epochs[i]->signal(); - } - } - - for (int i = 0; i < gEnv->worldSize; i++) { - if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { - epochs[i]->wait(); - } - } - - for (int i = 0; i < gEnv->worldSize; i++) { - if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { - epochs[i]->signal(); - } - } - - for (int i = 0; i < gEnv->worldSize; i++) { - if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { - epochs[i]->wait(); - } - } - - ASSERT_TRUE(testWriteCorrectness()); - communicator->bootstrapper()->barrier(); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Channel tests -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -class ChannelOneToOneTest : public CommunicatorTestBase { - protected: - void SetUp() override { - // Use only two ranks - setNumRanksToUse(2); - CommunicatorTestBase::SetUp(); - channelService = std::make_shared(*communicator.get()); - } - - void TearDown() override { CommunicatorTestBase::TearDown(); } - - void setupMeshConnections(std::vector& devChannels, bool useIbOnly, - void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0) { - const int rank = communicator->bootstrapper()->getRank(); - const int worldSize = communicator->bootstrapper()->getNranks(); - const bool isInPlace = (recvBuff == nullptr); - mscclpp::TransportFlags transport = mscclpp::Transport::CudaIpc | ibTransport; - - connectMesh(useIbOnly); - - for (int r = 0; r < worldSize; r++) { - if (r == rank) { - continue; - } - mscclpp::RegisteredMemory sendMemory; - mscclpp::RegisteredMemory remoteMemory; - // void* tmpBuff = nullptr; - - if (isInPlace) { - registerMemoryPair(sendBuff, sendBuffBytes, transport, 0, r, sendMemory, remoteMemory); - } else { - sendMemory = communicator->registerMemory(recvBuff, recvBuffBytes, transport); - mscclpp::RegisteredMemory recvMemory; - registerMemoryPair(recvBuff, recvBuffBytes, transport, 0, r, recvMemory, remoteMemory); - // tmpBuff = recvMemory.data(); - } - - mscclpp::channel::ChannelId cid = channelService->addChannel(connections[r]); - communicator->setup(); - - devChannels.emplace_back(channelService->deviceChannel(cid), channelService->addMemory(remoteMemory), - channelService->addMemory(sendMemory)); - } - } - - std::shared_ptr channelService; -}; - -__constant__ mscclpp::channel::SimpleDeviceChannel gChannelOneToOneTestConstDevChans; - -__global__ void kernelPingPong(int* buff, int rank, int nElem) { - mscclpp::channel::SimpleDeviceChannel& devChan = gChannelOneToOneTestConstDevChans; - volatile int* sendBuff = (volatile int*)buff; - int nTries = 1000; - int flusher = 0; - int rank1Offset = 10000000; - for (int i = 0; i < nTries; i++) { - if (rank == 0) { - if (i > 0) { - if (threadIdx.x == 0) devChan.wait(); - __syncthreads(); - for (int j = threadIdx.x; j < nElem; j += blockDim.x) { - if (sendBuff[j] != rank1Offset + i - 1 + j) { - printf("rank 0 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], 100000 + i - 1 + j); - } - } - } - for (int j = threadIdx.x; j < nElem; j += blockDim.x) { - sendBuff[j] = i + j; - } - __syncthreads(); - // __threadfence_system(); // not necessary if we make sendBuff volatile - if (threadIdx.x == 0) devChan.putWithSignal(0, nElem * sizeof(int)); - } - if (rank == 1) { - if (threadIdx.x == 0) devChan.wait(); - __syncthreads(); - for (int j = threadIdx.x; j < nElem; j += blockDim.x) { - if (sendBuff[j] != i + j) { - printf("rank 1 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], i + j); - } - } - if (i < nTries - 1) { - for (int j = threadIdx.x; j < nElem; j += blockDim.x) { - sendBuff[j] = rank1Offset + i + j; - } - __syncthreads(); - // __threadfence_system(); // not necessary if we make sendBuff volatile - if (threadIdx.x == 0) devChan.putWithSignal(0, nElem * sizeof(int)); - } - } - flusher++; - if (flusher == 100) { - devChan.flush(); - flusher = 0; - } - } -} - -TEST_F(ChannelOneToOneTest, PingPongIb) { - if (gEnv->rank >= numRanksToUse) return; - - const int nElem = 4 * 1024 * 1024; - - std::vector devChannels; - std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); - setupMeshConnections(devChannels, true, buff.get(), nElem * sizeof(int)); - - ASSERT_EQ(devChannels.size(), 1); - MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstDevChans, devChannels.data(), - sizeof(mscclpp::channel::SimpleDeviceChannel))); - - channelService->startProxy(); - - kernelPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1); - MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); - - kernelPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024); - MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); - - kernelPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024); - MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); - - kernelPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024); - MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); - - channelService->stopProxy(); -}