diff --git a/Makefile b/Makefile index e8c5bb25..7b44e154 100644 --- a/Makefile +++ b/Makefile @@ -120,8 +120,8 @@ LDFLAGS := $(NVLDFLAGS) $(GDRCOPY_LDFLAGS) -libverbs -lnuma LIBSRCS := $(addprefix src/,debug.cc utils.cc init.cc proxy.cc ib.cc config.cc) LIBSRCS += $(addprefix src/bootstrap/,bootstrap.cc socket.cc) -LIBSRCS += $(addprefix src/,communicator.cc connection.cc registered_memory.cc epoch.cc) -#LIBSRCS += $(addprefix src/,fifo.cc host_connection.cc proxy_cpp.cc basic_proxy_handler.cc) +LIBSRCS += $(addprefix src/,communicator.cc connection.cc registered_memory.cc) +LIBSRCS += $(addprefix src/,epoch.cc proxy_cpp.cc fifo.cc) ifneq ($(NPKIT), 0) LIBSRCS += $(addprefix src/misc/,npkit.cc) endif @@ -149,7 +149,7 @@ UTOBJTARGETS := $(UTOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) UTBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(UTOBJS)) TESTSDIR := tests -TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc allgather_test_standalone.cu bootstrap_test_cpp.cc communicator_test_cpp.cc) # allgather_test_cpp.cu +TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc allgather_test_standalone.cu bootstrap_test_cpp.cc communicator_test_cpp.cc allgather_test_cpp.cu) TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS)) TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS)) diff --git a/src/communicator.cc b/src/communicator.cc index 2507c175..074d127f 100644 --- a/src/communicator.cc +++ b/src/communicator.cc @@ -72,7 +72,7 @@ struct MemorySender : public Setuppable int tag_; }; -void Communicator::sendMemoryOnSetup(RegisteredMemory memory, int remoteRank, int tag) +MSCCLPP_API_CPP void Communicator::sendMemoryOnSetup(RegisteredMemory memory, int remoteRank, int tag) { addSetup(std::make_shared(memory, remoteRank, tag)); } @@ -94,7 +94,7 @@ struct MemoryReceiver : public Setuppable int tag_; }; -NonblockingFuture Communicator::recvMemoryOnSetup(int remoteRank, int tag) +MSCCLPP_API_CPP NonblockingFuture Communicator::recvMemoryOnSetup(int remoteRank, int tag) { auto memoryReceiver = std::make_shared(remoteRank, tag); addSetup(memoryReceiver); diff --git a/src/fifo.cc b/src/fifo.cc index c2fdd738..d5d70422 100644 --- a/src/fifo.cc +++ b/src/fifo.cc @@ -1,6 +1,7 @@ #include "alloc.h" #include "checks.hpp" #include "mscclppfifo.hpp" +#include "api.h" #include #include #include @@ -24,7 +25,7 @@ struct HostProxyFifo::Impl cudaStream_t stream; }; -HostProxyFifo::HostProxyFifo() +MSCCLPP_API_CPP HostProxyFifo::HostProxyFifo() { pimpl = std::make_unique(); MSCCLPPTHROW(mscclppCudaCalloc(&pimpl->deviceFifo.head, 1)); @@ -34,27 +35,27 @@ HostProxyFifo::HostProxyFifo() pimpl->hostTail = 0; } -HostProxyFifo::~HostProxyFifo() +MSCCLPP_API_CPP HostProxyFifo::~HostProxyFifo() { - MSCCLPPTHROW(mscclppCudaFree(pimpl->deviceFifo.head)); - MSCCLPPTHROW(mscclppCudaHostFree(pimpl->deviceFifo.triggers)); - MSCCLPPTHROW(mscclppCudaFree(pimpl->deviceFifo.tailReplica)); - CUDATHROW(cudaStreamDestroy(pimpl->stream)); + mscclppCudaFree(pimpl->deviceFifo.head); + mscclppCudaHostFree(pimpl->deviceFifo.triggers); + mscclppCudaFree(pimpl->deviceFifo.tailReplica); + cudaStreamDestroy(pimpl->stream); } -void HostProxyFifo::poll(ProxyTrigger* trigger) +MSCCLPP_API_CPP void HostProxyFifo::poll(ProxyTrigger* trigger) { __m128i xmm0 = _mm_load_si128((__m128i*)&pimpl->deviceFifo.triggers[pimpl->hostTail % MSCCLPP_PROXY_FIFO_SIZE]); _mm_store_si128((__m128i*)trigger, xmm0); } -void HostProxyFifo::pop() +MSCCLPP_API_CPP void HostProxyFifo::pop() { *(volatile uint64_t*)(&pimpl->deviceFifo.triggers[pimpl->hostTail % MSCCLPP_PROXY_FIFO_SIZE]) = 0; (pimpl->hostTail)++; } -void HostProxyFifo::flushTail(bool sync) +MSCCLPP_API_CPP void HostProxyFifo::flushTail(bool sync) { // Flush the tail to device memory. This is either triggered every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER to make sure // that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush @@ -66,7 +67,7 @@ void HostProxyFifo::flushTail(bool sync) } } -DeviceProxyFifo HostProxyFifo::toDevice() +MSCCLPP_API_CPP DeviceProxyFifo HostProxyFifo::deviceFifo() { return pimpl->deviceFifo; } diff --git a/src/ib.cc b/src/ib.cc index ec7e95f2..7e77b235 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -11,6 +11,7 @@ #include "debug.h" #include "ib.hpp" #include "mscclpp.hpp" +#include "api.h" #include #include @@ -372,14 +373,14 @@ const std::string& IbCtx::getDevName() const return this->devName; } -int getIBDeviceCount() +MSCCLPP_API_CPP int getIBDeviceCount() { int num; ibv_get_device_list(&num); return num; } -std::string getIBDeviceName(Transport ibTransport) +MSCCLPP_API_CPP std::string getIBDeviceName(Transport ibTransport) { int num; struct ibv_device** devices = ibv_get_device_list(&num); @@ -418,7 +419,7 @@ std::string getIBDeviceName(Transport ibTransport) return devices[ibTransportIndex]->name; } -Transport getIBTransportByDeviceName(const std::string& ibDeviceName) +MSCCLPP_API_CPP Transport getIBTransportByDeviceName(const std::string& ibDeviceName) { int num; struct ibv_device** devices = ibv_get_device_list(&num); diff --git a/src/include/channel.hpp b/src/include/channel.hpp index ace57661..42826f4f 100644 --- a/src/include/channel.hpp +++ b/src/include/channel.hpp @@ -13,7 +13,8 @@ namespace channel { class Channel { public: - Channel(std::shared_ptr connection) : connection_(connection), epoch_(std::make_shared()) {}; + Channel(Communicator& communicator, std::shared_ptr connection) + : connection_(connection), epoch_(std::make_shared(communicator, connection)) {}; Connection& connection() { return *connection_; } Epoch& epoch() { return *epoch_; } @@ -176,10 +177,10 @@ inline ProxyHandler makeChannelProxyHandler(DeviceChannelService& channelService class DeviceChannelService { public: - DeviceChannelService() : proxy_([&](ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }) {} + DeviceChannelService(Communicator& communicator) : communicator_(communicator), proxy_([&](ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }) {} ChannelId addChannel(std::shared_ptr connection) { - channels_.push_back(Channel(connection)); + channels_.push_back(Channel(communicator_, connection)); return channels_.size() - 1; } @@ -195,6 +196,7 @@ public: void stopProxy() { proxy_.stop(); } private: + Communicator& communicator_; std::vector channels_; std::vector memories_; Proxy proxy_; diff --git a/src/proxy_cpp.cc b/src/proxy_cpp.cc index b55d6995..2fb8c2b0 100644 --- a/src/proxy_cpp.cc +++ b/src/proxy_cpp.cc @@ -1,3 +1,4 @@ +#include "proxy.hpp" #include "api.h" #include "mscclpp.hpp" #include "utils.h" diff --git a/tests/allgather_test_cpp.cu b/tests/allgather_test_cpp.cu index 791e2ca9..34050814 100644 --- a/tests/allgather_test_cpp.cu +++ b/tests/allgather_test_cpp.cu @@ -49,9 +49,9 @@ static double getTime(void) return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; } -__constant__ mscclpp::channel::SimpleDeviceConnection constDevConns[16]; +__constant__ mscclpp::channel::SimpleDeviceChannel constDevChans[16]; -__device__ void allgather0(mscclpp::channel::SimpleDeviceConnection devConn, int rank, int world_size, int remoteRank, +__device__ void allgather0(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int remoteRank, size_t nelemsPerGPU) { // this allgather is really simple and implemented as an alltoall @@ -59,19 +59,19 @@ __device__ void allgather0(mscclpp::channel::SimpleDeviceConnection devConn, int // this thread's role is a sender role // put your data asynchronously if ((threadIdx.x % 32) == 0) - devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); + devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); // make sure everyone is put their data before some thread randomly blocks everyone else in signal __syncthreads(); // push with flag and sync to make sure the data is received if ((threadIdx.x % 32) == 0) - devConn.flush(); + devChan.flush(); // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready if ((threadIdx.x % 32) == 0) - devConn.wait(); + devChan.wait(); } -__device__ void localAllGather(mscclpp::channel::SimpleDeviceConnection devConn, int rank, int world_size, int nranksPerNode, +__device__ void localAllGather(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) { // this allgather algorithm works as follows: @@ -84,25 +84,25 @@ __device__ void localAllGather(mscclpp::channel::SimpleDeviceConnection devConn, if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) { // put your data to GPU (rank+i) % nranksPerNode and signal in one call if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush(offset, size); + devChan.putWithSignalAndFlush(offset, size); } // wait for the data from GPU (rank-i) % nranksPerNode to arrive if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { if ((threadIdx.x % 32) == 0) - devConn.wait(); + devChan.wait(); } asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); } } -__device__ void allgather1(mscclpp::channel::SimpleDeviceConnection devConn, int rank, int world_size, int nranksPerNode, +__device__ void allgather1(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int nranksPerNode, int remoteRank, size_t nelemsPerGPU) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + localAllGather(devChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); } -__device__ void allgather2(mscclpp::channel::SimpleDeviceConnection devConn, int rank, int world_size, int nranksPerNode, +__device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int nranksPerNode, int remoteRank, size_t nelemsPerGPU) { // this allgather is a pipelined and hierarchical one and only works for two nodes @@ -120,17 +120,17 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceConnection devConn, int // Step 1 // local allgather if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + localAllGather(devChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); } // cross-node exchange if (remoteRank % nranksPerNode == rank % nranksPerNode) { // opposite side if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int), + devChan.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int), (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); if ((threadIdx.x % 32) == 0) - devConn.wait(); + devChan.wait(); } __syncthreads(); @@ -139,7 +139,7 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceConnection devConn, int // local allgather int otherNghr = (rank + nranksPerNode) % world_size; if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), + localAllGather(devChan, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); } @@ -147,11 +147,11 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceConnection devConn, int if (remoteRank % nranksPerNode == rank % nranksPerNode) { // opposite side if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * + devChan.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), nelemsPerGPU / pipelineSize * sizeof(int)); if ((threadIdx.x % 32) == 0) - devConn.wait(); + devChan.wait(); } __syncthreads(); @@ -159,7 +159,7 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceConnection devConn, int // Step 3 // local allgather if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, + localAllGather(devChan, rank, world_size, nranksPerNode, remoteRank, (otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), nelemsPerGPU / pipelineSize * sizeof(int)); } @@ -167,18 +167,18 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceConnection devConn, int __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel) { - // find the mapping between remoteRank and devConns + // find the mapping between remoteRank and devChans int warpId = threadIdx.x / 32; int remoteRank = (warpId < rank) ? warpId : warpId + 1; // Each warp is responsible for one of the remote ranks - mscclpp::channel::SimpleDeviceConnection devConn = constDevConns[warpId]; + mscclpp::channel::SimpleDeviceChannel devChan = constDevChans[warpId]; if (kernel == 0) - allgather0(devConn, rank, world_size, remoteRank, nelemsPerGPU); + allgather0(devChan, rank, world_size, remoteRank, nelemsPerGPU); else if (kernel == 1) - allgather1(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); + allgather1(devChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); else if (kernel == 2) - allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); + allgather2(devChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); } int rankToLocalRank(int rank) @@ -218,41 +218,44 @@ void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSiz CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice)); } -void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& comm, int* data_d, size_t dataSize) +void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& comm, mscclpp::channel::DeviceChannelService& channelService, int* data_d, size_t dataSize) { int thisNode = rankToNode(rank); int cudaNum = rankToLocalRank(rank); std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum); mscclpp::Transport ibTransport = mscclpp::getIBTransportByDeviceName(ibDevStr); + std::vector channelIds; + std::vector localMemories; + std::vector> remoteMemories; for (int r = 0; r < world_size; ++r) { if (r == rank) continue; mscclpp::Transport transport; - const char* ibDev = ibDevStr.c_str(); if (rankToNode(r) == thisNode) { - ibDev = NULL; - transportType = mscclpp::Transport::CudaIpc; + transport = mscclpp::Transport::CudaIpc; } else { - transportType = ibTransport; + transport = ibTransport; } // Connect with all other ranks - auto connId = channelService.addChannel(comm.connect(r, 0, transportType)); - auto memoryId = channelService.addMemory(comm.registerMemory(data_d, dataSize, mscclpp::Transport::CudaIpc | ibTransport)); + channelIds.push_back(channelService.addChannel(comm.connect(r, 0, transport))); + auto memory = comm.registerMemory(data_d, dataSize, mscclpp::Transport::CudaIpc | ibTransport); + localMemories.push_back(memory); + comm.sendMemoryOnSetup(memory, r, 0); + remoteMemories.push_back(comm.recvMemoryOnSetup(r, 0)); } comm.setup(); - mscclpp::channel::DeviceChannelService channelService; + std::vector devChannels; + for (size_t i = 0; i < channelIds.size(); ++i) { + devChannels.push_back(mscclpp::channel::SimpleDeviceChannel(channelService.deviceChannel(channelIds[i]), + channelService.addMemory(remoteMemories[i].get()), channelService.addMemory(localMemories[i]))); + } - std::vector devConns; - std::transform( - hostConns.begin(), hostConns.end(), std::back_inserter(devConns), - [](std::shared_ptr& hostConn) { return mscclpp::SimpleDeviceConnection(*hostConn); }); - - assert(devConns.size() < sizeof(constDevConns) / sizeof(mscclpp::SimpleDeviceConnection)); + assert(devChannels.size() < sizeof(constDevChans) / sizeof(mscclpp::channel::SimpleDeviceChannel)); CUDACHECK( - cudaMemcpyToSymbol(constDevConns, devConns.data(), sizeof(mscclpp::SimpleDeviceConnection) * devConns.size())); + cudaMemcpyToSymbol(constDevChans, devChannels.data(), sizeof(mscclpp::channel::SimpleDeviceChannel) * devChannels.size())); } void printUsage(const char* prog, bool isMpi) @@ -405,7 +408,10 @@ int main(int argc, const char* argv[]) try { if (rank == 0) printf("Initializing MSCCL++\n"); - mscclpp::Communicator comm(world_size, ip_port, rank); + auto bootstrapper = std::make_shared(rank, world_size); + bootstrapper->initialize(ip_port); + mscclpp::Communicator comm(bootstrapper); + mscclpp::channel::DeviceChannelService channelService(comm); if (rank == 0) printf("Initializing data for allgather test\n"); @@ -413,11 +419,11 @@ int main(int argc, const char* argv[]) if (rank == 0) printf("Setting up the connection in MSCCL++\n"); - setupMscclppConnections(rank, world_size, comm, data_d, dataSize); + setupMscclppConnections(rank, world_size, comm, channelService, data_d, dataSize); if (rank == 0) printf("Launching MSCCL++ proxy threads\n"); - comm.startProxying(); + channelService.startProxy(); if (rank == 0) printf("Testing the correctness of AllGather implementation\n"); @@ -437,7 +443,7 @@ int main(int argc, const char* argv[]) } int tmp[16]; // A simple barrier - comm.bootstrapAllGather(tmp, sizeof(int)); + bootstrapper->allGather(tmp, sizeof(int)); if (rank == 0) printf("Successfully checked the correctness\n"); @@ -446,12 +452,12 @@ int main(int argc, const char* argv[]) if (rank == 0) printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); CUDACHECK(cudaStreamSynchronize(stream)); - comm.bootstrapAllGather(tmp, sizeof(int)); + bootstrapper->allGather(tmp, sizeof(int)); for (int i = 0; i < iterwithoutcudagraph; ++i) { kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); } CUDACHECK(cudaStreamSynchronize(stream)); - comm.bootstrapAllGather(tmp, sizeof(int)); + bootstrapper->allGather(tmp, sizeof(int)); // cudaGraph Capture int cudagraphiter = 10; @@ -480,7 +486,7 @@ int main(int argc, const char* argv[]) if (rank == 0) printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch, cudagraphiter); - comm.bootstrapAllGather(tmp, sizeof(int)); + bootstrapper->allGather(tmp, sizeof(int)); double t0, t1, ms, time_in_us; t0 = getTime(); for (int i = 0; i < cudagraphlaunch; ++i) { @@ -493,11 +499,11 @@ int main(int argc, const char* argv[]) time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter; printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us, (double)(dataSize) / 1e9 / (time_in_us / 1e6)); - comm.bootstrapAllGather(tmp, sizeof(int)); + bootstrapper->allGather(tmp, sizeof(int)); if (rank == 0) printf("Stopping MSCCL++ proxy threads\n"); - comm.stopProxying(); + channelService.stopProxy(); } catch (std::exception& e) { // todo: throw exceptions in the implementation and process them here