From 961f5b38ddf1cfe5eebfef40c4d5b81defb6daa4 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Tue, 2 May 2023 00:44:13 +0000 Subject: [PATCH] more debbuging info + testing 1000 memory registerations --- src/connection.cc | 3 + tests/communicator_test_cpp.cc | 177 ++++++++++++++++++++------------- 2 files changed, 109 insertions(+), 71 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index 2cfa7205..e0c52419 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -61,6 +61,8 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register char* srcPtr = (char*)src.data(); CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, srcPtr + srcOffset, size, cudaMemcpyDeviceToDevice, stream)); + INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, size %lu", srcPtr + srcOffset, dstPtr + dstOffset, size); + // npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size); } @@ -114,6 +116,7 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem qp->stageSend(srcMr, dstMrInfo, (uint32_t)size, /*wrId=*/0, /*srcOffset=*/srcOffset, /*dstOffset=*/dstOffset, /*signaled=*/true); qp->postSend(); + INFO(MSCCLPP_NET, "IBConnection write: from %p to %p, size %lu", (uint8_t*)srcMr->getBuff() + srcOffset, (uint8_t*)dstMrInfo.addr + dstOffset, size); // npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size); } diff --git a/tests/communicator_test_cpp.cc b/tests/communicator_test_cpp.cc index 78bffaac..6f7aa3e1 100644 --- a/tests/communicator_test_cpp.cc +++ b/tests/communicator_test_cpp.cc @@ -23,6 +23,55 @@ mscclpp::Transport findIb(int localRank) return IBs[localRank]; } +void register_all_memories(std::unique_ptr& communicator, int rank, int worldSize, void* devicePtr, size_t deviceBufferSize, mscclpp::Transport myIbDevice, mscclpp::RegisteredMemory& localMemory, std::unordered_map& remoteMemory){ + localMemory = communicator->registerMemory(devicePtr, deviceBufferSize, mscclpp::Transport::CudaIpc | myIbDevice); + int serializedSize = 0; + for (int i = 0; i < worldSize; i++) { + if (i != rank){ + auto serialized = localMemory.serialize(); + serializedSize = serialized.size(); + communicator->bootstrapper()->send(serialized.data(), serializedSize, i, 0); + } + } + if (serializedSize == 0) { + throw std::runtime_error("Serialized size should have been set to a non-zero value."); + } + for (int i = 0; i < worldSize; i++) { + if (i != rank){ + std::vector deserialized(serializedSize); + communicator->bootstrapper()->recv(deserialized.data(), serializedSize, i, 0); + auto remote = mscclpp::RegisteredMemory::deserialize(deserialized); + remoteMemory[i] = remote; + } + } +} + +void make_connections(std::unique_ptr& communicator, int rank, int worldSize, int nRanksPerNode, mscclpp::Transport myIbDevice, std::unordered_map>& connections){ + for (int i = 0; i < worldSize; i++) { + if (i != rank){ + if (i / nRanksPerNode == rank / nRanksPerNode) { + connections[i] = communicator->connect(i, 0, mscclpp::Transport::CudaIpc); + } else { + connections[i] = communicator->connect(i, 0, myIbDevice); + } + } + } + communicator->connectionSetup(); +} + +void write_remote(int rank, int worldSize, std::unordered_map>& connections, std::unordered_map& remoteRegisteredMemories, mscclpp::RegisteredMemory& registeredMemory, int writeSize){ + for (int i = 0; i < worldSize; i++) { + if (i != rank) { + auto& conn = connections.at(i); + auto& peerMemory = remoteRegisteredMemories.at(i); + // printf("write to rank: %d, rank is %d\n", peerMemory.rank(), rank); + conn->write(peerMemory, rank * writeSize, registeredMemory, rank * writeSize, writeSize); + conn->flush(); + } + } + +} + void test_communicator(int rank, int worldSize, int nranksPerNode) { auto bootstrap = std::make_shared(rank, worldSize); @@ -32,104 +81,90 @@ void test_communicator(int rank, int worldSize, int nranksPerNode) MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); bootstrap->initialize(id); - auto communicator = std::make_shared(bootstrap); + auto communicator = std::make_unique(bootstrap); if (bootstrap->getRank() == 0) std::cout << "Communicator initialization passed" << std::endl; std::unordered_map> connections; auto myIbDevice = findIb(rank % nranksPerNode); - for (int i = 0; i < worldSize; i++) { - if (i != rank) { - std::shared_ptr conn; - if (i / nranksPerNode == rank / nranksPerNode) { - conn = communicator->connect(i, 0, mscclpp::Transport::CudaIpc); - } else { - conn = communicator->connect(i, 0, myIbDevice); - } - connections[i] = conn; - } - } - communicator->connectionSetup(); + make_connections(communicator, rank, worldSize, nranksPerNode, myIbDevice, connections); if (bootstrap->getRank() == 0) std::cout << "Connection setup passed" << std::endl; - int* devicePtr; - int size = 1024; - CUDATHROW(cudaMalloc(&devicePtr, size)); - auto registeredMemory = communicator->registerMemory(devicePtr, size, mscclpp::Transport::CudaIpc | myIbDevice); + int numBuffers = 1000; + std::vector devicePtr(numBuffers); + int deviceBufferSize = 1024*1024; + + std::vector localMemory(numBuffers); + std::vector> remoteMemory(numBuffers); - for (int i = 0; i < worldSize; i++) { - if (i != rank){ - auto serialized = registeredMemory.serialize(); - int serializedSize = serialized.size(); - bootstrap->send(&serializedSize, sizeof(int), i, 0); - bootstrap->send(serialized.data(), serializedSize, i, 1); - } + for (int n = 0; n < numBuffers; n++) { + if (n % 100 == 0) + std::cout << "Registering memory for " << std::to_string(n) << " buffers" << std::endl; + CUDATHROW(cudaMalloc(&devicePtr[n], deviceBufferSize)); + register_all_memories(communicator, rank, worldSize, devicePtr[n], deviceBufferSize, myIbDevice, localMemory[n], remoteMemory[n]); } - std::unordered_map registeredMemories; - for (int i = 0; i < worldSize; i++) { - if (i != rank){ - int deserializedSize; - bootstrap->recv(&deserializedSize, sizeof(int), i, 0); - std::vector deserialized(deserializedSize); - bootstrap->recv(deserialized.data(), deserializedSize, i, 1); - auto deserializedRegisteredMemory = mscclpp::RegisteredMemory::deserialize(deserialized); - registeredMemories.insert({deserializedRegisteredMemory.rank(), deserializedRegisteredMemory}); - } - } - bootstrap->barrier(); if (bootstrap->getRank() == 0) - std::cout << "Memory registration passed" << std::endl; + std::cout << "Memory registration for " << std::to_string(numBuffers) << " buffers passed" << std::endl; - assert((size / sizeof(int)) % worldSize == 0); - size_t writeSize = size / worldSize; - size_t dataCount = size / sizeof(int); - // std::vector hostBuffer(dataCount, 0); - std::shared_ptr hostBuffer(new int[dataCount]); - for (int i = 0; i < dataCount; i++) { - hostBuffer[i] = rank; + + assert((deviceBufferSize / sizeof(int)) % worldSize == 0); + size_t writeSize = deviceBufferSize / worldSize; + size_t dataCount = deviceBufferSize / sizeof(int); + for (int n = 0; n < numBuffers; n++){ + std::vector hostBuffer(dataCount, 0); + for (int i = 0; i < dataCount; i++) { + hostBuffer[i] = rank + n * worldSize; + } + CUDATHROW(cudaMemcpy(devicePtr[n], hostBuffer.data(), deviceBufferSize, cudaMemcpyHostToDevice)); } - CUDATHROW(cudaMemcpy(devicePtr, hostBuffer.get(), size, cudaMemcpyHostToDevice)); CUDATHROW(cudaDeviceSynchronize()); bootstrap->barrier(); - for (int i = 0; i < worldSize; i++) { - if (i != rank) { - auto& conn = connections.at(i); - auto& peerMemory = registeredMemories.at(i); - // printf("write to rank: %d, rank is %d\n", peerMemory.rank(), rank); - conn->write(peerMemory, rank * writeSize, registeredMemory, rank * writeSize, writeSize); - conn->flush(); - } + if (bootstrap->getRank() == 0) + std::cout << "CUDA memory initialization passed" << std::endl; + + for (int n = 0; n < numBuffers; n++){ + write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], writeSize); } bootstrap->barrier(); - // polling until it becomes ready - bool ready = false; - int niter = 0; - do { - ready = true; - CUDATHROW(cudaMemcpy(hostBuffer.get(), devicePtr, size, cudaMemcpyDeviceToHost)); - size_t dataPerRank = writeSize / sizeof(int); - for (int i = 0; i < dataCount; i++) { - if (hostBuffer[i] != i / dataPerRank) { - ready = false; + if (bootstrap->getRank() == 0) + std::cout << "RDMA write for " << std::to_string(numBuffers) << " buffers passed" << std::endl; + + for (int n = 0; n < numBuffers; n++){ + // polling until it becomes ready + bool ready = false; + int niter = 0; + std::vector hostBuffer(dataCount, 0); + do { + ready = true; + CUDATHROW(cudaMemcpy(hostBuffer.data(), devicePtr[n], deviceBufferSize, cudaMemcpyDeviceToHost)); + for (int i = 0; i < worldSize; i++) { + for (int j = i*writeSize/sizeof(int); j < (i+1)*writeSize/sizeof(int); j++) { + if (hostBuffer[j] != i + n * worldSize) { + ready = false; + } + } } - } - if (niter == 10000){ - throw std::runtime_error("Polling is stuck."); - } - niter++; - } while (!ready); + if (niter == 10000){ + throw std::runtime_error("Polling is stuck."); + } + niter++; + } while (!ready); + } bootstrap->barrier(); if (bootstrap->getRank() == 0) - std::cout << "Connection write passed" << std::endl; + std::cout << "Polling for " << std::to_string(numBuffers) << " buffers passed" << std::endl; - CUDATHROW(cudaFree(devicePtr)); if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Communicator tests passed! ---" << std::endl; + + for (int n = 0; n < numBuffers; n++){ + CUDATHROW(cudaFree(devicePtr[n])); + } } int main(int argc, char** argv)