diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index 3b307fca..6029c364 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -232,6 +232,12 @@ struct ProxyChannel { fifo_.push(ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, size, semaphoreId_).value); } + __forceinline__ __device__ void put2DWithSignal(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, + uint32_t width, uint32_t height) { + fifo_.push( + ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, width, height, semaphoreId_).value); + } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param dst The destination memory region. /// @param src The source memory region. @@ -241,6 +247,11 @@ struct ProxyChannel { putWithSignal(dst, offset, src, offset, size); } + __forceinline__ __device__ void put2DWithSignal(MemoryId dst, MemoryId src, uint64_t offset, uint32_t width, + uint32_t height) { + put2DWithSignal(dst, offset, src, offset, width, height); + } + /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. /// @param dst The destination memory region. /// @param dstOffset The offset into the destination memory region. @@ -337,11 +348,20 @@ struct SimpleProxyChannel { proxyChan_.putWithSignal(dst_, dstOffset, src_, srcOffset, size); } + __forceinline__ __device__ void put2DWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint32_t width, + uint32_t height) { + proxyChan_.put2DWithSignal(dst_, dstOffset, src_, srcOffset, width, height); + } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. __forceinline__ __device__ void putWithSignal(uint64_t offset, uint64_t size) { putWithSignal(offset, offset, size); } + __forceinline__ __device__ void put2DWithSignal(uint64_t offset, uint32_t width, uint32_t height) { + put2DWithSignal(offset, offset, width, height); + } + /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. /// @param dstOffset The offset into the destination memory region. /// @param srcOffset The offset into the source memory region. diff --git a/src/connection.cc b/src/connection.cc index b1d8ea62..b5cafdf6 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -66,10 +66,12 @@ void CudaIpcConnection::write2D(RegisteredMemory dst, uint64_t dstOffset, Regist char* dstPtr = (char*)dst.data(); char* srcPtr = (char*)src.data(); + INFO(MSCCLPP_P2P, + "CudaIpcConnection write: from %p to %p, width %lu height %lu dstPitch %lu srcPitch %lu dstOffset %lu srcOffset " + "%lu", + srcPtr + srcOffset, dstPtr + dstOffset, width, height, dst.pitch(), src.pitch(), dstOffset, srcOffset); MSCCLPP_CUDATHROW(cudaMemcpy2DAsync(dstPtr + dstOffset, dst.pitch(), srcPtr + srcOffset, src.pitch(), width, height, cudaMemcpyDeviceToDevice, stream_)); - INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, width %lu height %lu dstPitch %lu srcPitch %lu", - srcPtr + srcOffset, dstPtr + dstOffset, width, height, dst.pitch(), src.pitch()); } void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index 08803d15..c7ca523e 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -219,7 +219,7 @@ TEST_F(CommunicatorTest, BasicWrite) { TEST_F(CommunicatorTest, TileWrite) { if (gEnv->rank >= numRanksToUse) return; - if (numRanksToUse > gEnv->nRanksPerNode) { + if (gEnv->worldSize > gEnv->nRanksPerNode) { // tile write only support single node GTEST_SKIP(); } diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index b2e6fd20..cbad17cf 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -140,6 +140,8 @@ class ProxyChannelOneToOneTest : public CommunicatorTestBase { void setupMeshConnections(std::vector& proxyChannels, bool useIbOnly, void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0); + void setupMeshConnections(std::vector& proxyChannels, bool useIbOnly, void* sendBuff, + size_t sendBuffBytes, size_t pitchSize, void* recvBuff = nullptr, size_t recvBuffBytes = 0); void testPacketPingPong(bool useIbOnly); void testPacketPingPongPerf(bool useIbOnly); diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index 71c683b1..e9c13250 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -20,15 +20,21 @@ void ProxyChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); } void ProxyChannelOneToOneTest::setupMeshConnections( std::vector>& proxyChannels, bool useIbOnly, void* sendBuff, size_t sendBuffBytes, void* recvBuff, size_t recvBuffBytes) { + setupMeshConnections(proxyChannels, useIbOnly, sendBuff, sendBuffBytes, sendBuffBytes, recvBuff, recvBuffBytes); +} + +void ProxyChannelOneToOneTest::setupMeshConnections( + std::vector>& proxyChannels, bool useIbOnly, void* sendBuff, + size_t sendBuffBytes, size_t pitchSize, void* recvBuff, size_t recvBuffBytes) { const int rank = communicator->bootstrap()->getRank(); const int worldSize = communicator->bootstrap()->getNranks(); const bool isInPlace = (recvBuff == nullptr); mscclpp::TransportFlags transport = (useIbOnly) ? ibTransport : (mscclpp::Transport::CudaIpc | ibTransport); - mscclpp::RegisteredMemory sendBufRegMem = communicator->registerMemory(sendBuff, sendBuffBytes, transport); + mscclpp::RegisteredMemory sendBufRegMem = communicator->registerMemory(sendBuff, sendBuffBytes, pitchSize, transport); mscclpp::RegisteredMemory recvBufRegMem; if (!isInPlace) { - recvBufRegMem = communicator->registerMemory(recvBuff, recvBuffBytes, transport); + recvBufRegMem = communicator->registerMemory(recvBuff, recvBuffBytes, pitchSize, transport); } for (int r = 0; r < worldSize; r++) { @@ -63,6 +69,72 @@ void ProxyChannelOneToOneTest::setupMeshConnections( __constant__ DeviceHandle gChannelOneToOneTestConstProxyChans; +__device__ size_t getTileElementOffset(int elementId, int width, int rowIndex, int colIndex, int nElemInPitch) { + int rowIndexInTile = elementId / width; + int colIndexInTile = elementId % width; + return (rowIndex + rowIndexInTile) * nElemInPitch + (colIndex + colIndexInTile); +} + +__global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowIndex, int colIndex, int width, + int hight, int* ret) { + DeviceHandle& proxyChan = gChannelOneToOneTestConstProxyChans; + volatile int* sendBuff = (volatile int*)buff; + int nTries = 1000; + int flusher = 0; + size_t offset = rowIndex * pitch + colIndex * sizeof(int); + size_t nElem = width * hight; + size_t nElemPerInPitch = pitch / sizeof(int); + for (int i = 0; i < nTries; i++) { + if (rank == 0) { + if (i > 0) { + if (threadIdx.x == 0) proxyChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + if (sendBuff[tileOffset] != offset + i - 1 + j) { + // printf("rank 0 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], rank1Offset + i - 1 + j); + *ret = 1; + break; + } + } + } + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + sendBuff[tileOffset] = i + j; + } + __syncthreads(); + // __threadfence_system(); // not necessary if we make sendBuff volatile + if (threadIdx.x == 0) proxyChan.put2DWithSignal(offset, width * sizeof(int), hight); + } + if (rank == 1) { + if (threadIdx.x == 0) proxyChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + if (sendBuff[tileOffset] != i + j) { + // printf("rank 1 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], i + j); + *ret = 1; + break; + } + } + if (i < nTries - 1) { + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + sendBuff[tileOffset] = offset + i + j; + } + __syncthreads(); + // __threadfence_system(); // not necessary if we make sendBuff volatile + if (threadIdx.x == 0) proxyChan.put2DWithSignal(offset, width * sizeof(int), hight); + } + } + flusher++; + if (flusher == 100) { + if (threadIdx.x == 0) proxyChan.flush(); + flusher = 0; + } + } +} + __global__ void kernelProxyPingPong(int* buff, int rank, int nElem, int* ret) { DeviceHandle& proxyChan = gChannelOneToOneTestConstProxyChans; volatile int* sendBuff = (volatile int*)buff; @@ -156,6 +228,54 @@ TEST_F(ProxyChannelOneToOneTest, PingPongIb) { channelService->stopProxy(); } +TEST_F(ProxyChannelOneToOneTest, PingPongTile) { + if (gEnv->rank >= numRanksToUse) return; + if (gEnv->worldSize > gEnv->nRanksPerNode) { + // tile write only support single node + GTEST_SKIP(); + } + + const int nElem = 4 * 1024 * 1024; + + std::vector> proxyChannels; + std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + const int pitchSize = 512; // the buff tile is 8192x128 + setupMeshConnections(proxyChannels, false, buff.get(), nElem * sizeof(int), pitchSize); + + ASSERT_EQ(proxyChannels.size(), 1); + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstProxyChans, proxyChannels.data(), + sizeof(DeviceHandle))); + + channelService->startProxy(); + + std::shared_ptr ret = mscclpp::makeSharedCudaHost(0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 0, 0, 1, 1, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 128, 32, 64, 64, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 16, 16, 1, 8192, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 5, 0, 128, 1, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 0, 0, 128, 8192, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); +} + __device__ mscclpp::DeviceSyncer gChannelOneToOneTestProxyChansSyncer; template