From ef558a42e8660ca5f4b2652b59845d51ec050595 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Fri, 12 May 2023 05:54:32 +0000 Subject: [PATCH] wip --- test/CMakeLists.txt | 1 + test/allgather_test_host_offloading.cu | 282 +++++++++++++++++++++++++ 2 files changed, 283 insertions(+) create mode 100644 test/allgather_test_host_offloading.cu diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d7e59bc6..4ce78e68 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -11,6 +11,7 @@ endfunction() add_test_executable(bootstrap_test_cpp bootstrap_test_cpp.cc) add_test_executable(communicator_test_cpp communicator_test_cpp.cu) add_test_executable(allgather_test_cpp allgather_test_cpp.cu) +add_test_executable(allgather_test_host_offloading allgather_test_host_offloading.cu) add_test_executable(ib_test ib_test.cc) # Unit tests diff --git a/test/allgather_test_host_offloading.cu b/test/allgather_test_host_offloading.cu new file mode 100644 index 00000000..c7a80611 --- /dev/null +++ b/test/allgather_test_host_offloading.cu @@ -0,0 +1,282 @@ +#include +#include +#include +#include + +#ifdef MSCCLPP_USE_MPI_FOR_TESTS +#include "mpi.h" +#endif // MSCCLPP_USE_MPI_FOR_TESTS +#include +#include +#include +#include +#include +#include + +int nranksPerNode; +int rank; +int world_size; + +// Propagate errors up + +#define MSCCLPPCHECK(call) \ + do { \ + mscclppResult_t res = call; \ + if (res != mscclppSuccess && res != mscclppInProgress) { \ + /* Print the back trace*/ \ + printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \ + return res; \ + } \ + } while (0) + +// Check CUDA RT calls +#define CUDACHECK(cmd) \ + do { \ + cudaError_t err = cmd; \ + if (err != cudaSuccess) { \ + printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ + exit(EXIT_FAILURE); \ + } \ + } while (false) + +// Measure current time in second. +static double getTime(void) +{ + struct timespec tspec; + if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { + printf("clock_gettime failed\n"); + exit(EXIT_FAILURE); + } + return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; +} + + +__global__ void kernel(int r, int nranks, mscclpp::DeviceProxyFifo fifo, mscclpp::DeviceEpoch::DeviceHandle* handles) +{ + int tid = threadIdx.x; + if (tid != r) + handles[tid].epochIncrement(); + if (tid == 0){ + mscclpp::ProxyTrigger trigger; + trigger.fst = 1; + fifo.push(trigger); + } + if (tid != r) + handles[tid].wait(); +} + +int rankToLocalRank(int rank) +{ + return rank % nranksPerNode; +} + +int rankToNode(int rank) +{ + return rank / nranksPerNode; +} + +void print_usage(const char* prog) +{ +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + printf("usage: %s IP:PORT [rank nranks]\n", prog); +#else + printf("usage: %s IP:PORT rank nranks\n", prog); +#endif +} + +void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h, + int** data_d) +{ + CUDACHECK(cudaMalloc(data_d, dataSize)); + CUDACHECK(cudaMemset(*data_d, 0, dataSize)); + + *data_h = new int[nelemsPerGPU * world_size]; + for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { + int val = i + 1; + if (i / nelemsPerGPU == (size_t)rank) { + (*data_h)[i] = val; + } else { + (*data_h)[i] = 0; + } + } + CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice)); +} + +class MyProxyService { +private: + int deviceNumaNode; +public: + MyProxyService() : remoteMemories(world_size), connections(world_size), + proxy([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) { + // int cudaDevice; + // CUDACHECK(cudaGetDevice(&cudaDevice)); + // getDeviceNumaNode(cudaDevice, &deviceNumaNode); + } + + void bindThread() { + // if (deviceNumaNode >= 0) { + // numaBind(deviceNumaNode); + // INFO(MSCCLPP_INIT, "NUMA node of DeviceChannelService proxy thread is set to %d", deviceNumaNode); + // } + } + + mscclpp::ProxyHandlerResult handleTrigger(mscclpp::ProxyTrigger triggerRaw) { + // do something with it. + return mscclpp::ProxyHandlerResult::FlushFifoTailAndContinue; + } + mscclpp::Proxy proxy; + std::vector remoteMemories; + mscclpp::RegisteredMemory localMemory; + std::vector> hostEpochs; + std::vector> deviceEpochs; + std::vector> connections; +}; + +void setupProxyService(mscclpp::Communicator& comm, MyProxyService& proxyService, int* data_d, int dataSize) +{ + int thisNode = rankToNode(rank); + int cudaNum = rankToLocalRank(rank); + std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum); + mscclpp::Transport ibTransport = mscclpp::getIBTransportByDeviceName(ibDevStr); + std::vector> remoteMemories(world_size); + + proxyService.localMemory = comm.registerMemory(data_d, dataSize, mscclpp::Transport::CudaIpc | ibTransport); + for (int r = 0; r < world_size; ++r) { + if (r == rank){ + proxyService.hostEpochs.emplace_back(nullptr); + proxyService.deviceEpochs.emplace_back(nullptr); + continue; + } + mscclpp::Transport transport; + if (rankToNode(r) == thisNode) { + transport = mscclpp::Transport::CudaIpc; + } else { + transport = ibTransport; + } + // Connect with all other ranks + proxyService.connections[r] = comm.connectOnSetup(r, 0, transport); + proxyService.hostEpochs.emplace_back(std::make_shared(comm, proxyService.connections[r])); + proxyService.deviceEpochs.emplace_back(std::make_shared(comm, proxyService.connections[r])); + comm.sendMemoryOnSetup(proxyService.localMemory, r, 0); + + remoteMemories[r] = comm.recvMemoryOnSetup(r, 0); + } + + comm.setup(); + for (int r = 0; r < world_size; ++r) { + if (r == rank){ + continue; + } + proxyService.remoteMemories[r] = remoteMemories[r].get(); + } +} + +std::unordered_map parseArgs(int argc, char* argv[]) +{ + std::unordered_map options; + + for (int i = 1; i < argc; i++) { + std::string arg = argv[i]; + if (arg == "-datasize") { + if (i + 1 < argc) { + options["datasize"] = argv[++i]; + } else { + fprintf(stderr, "Error: -datasize option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-help" || arg == "-h") { + exit(0); + } else { + fprintf(stderr, "Error: Unknown option %s\n", argv[i]); + exit(-1); + } + } + return options; +} + + +int main(int argc, char* argv[]) +{ + MPI_Init(&argc, &argv); + auto parsedArgs = parseArgs(argc, argv); + + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + // get the local number of nodes with MPI + MPI_Comm shmcomm; + MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); + int shmrank; + MPI_Comm_size(shmcomm, &shmrank); + nranksPerNode = shmrank; + MPI_Comm_free(&shmcomm); + + + int cudaNum = rankToLocalRank(rank); + CUDACHECK(cudaSetDevice(cudaNum)); + + if (rank == 0) + printf("Initializing MSCCL++\n"); + auto bootstrap = std::make_shared(rank, world_size); + mscclpp::UniqueId uniqueId; + if (rank == 0) + uniqueId = bootstrap->createUniqueId(); + MPI_Bcast(&uniqueId, sizeof(uniqueId), MPI_BYTE, 0, MPI_COMM_WORLD); + bootstrap->initialize(uniqueId); + mscclpp::Communicator comm(bootstrap); + + int* data_d; + int* data_h; + size_t dataSize = 1024 * 1024 * 1024; + if (parsedArgs.find("datasize") != parsedArgs.end()) { + dataSize = std::stoul(parsedArgs["datasize"]); + } + size_t nelemsPerGPU = dataSize / sizeof(int) / world_size; + + if (rank == 0) + printf("Initializing data for allgather test\n"); + initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d); + + if (rank == 0) + printf("Setting up the connection in MSCCL++\n"); + + MyProxyService proxyService; + setupProxyService(comm, proxyService, data_d, dataSize); + + if (rank == 0) + printf("Launching MSCCL++ proxy threads\n"); + proxyService.proxy.start(); + mscclpp::DeviceProxyFifo fifo = proxyService.proxy.fifo().deviceFifo(); + if (rank == 0) + printf("Testing the correctness of AllGather implementation\n"); + cudaStream_t stream; + CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); + mscclpp::DeviceEpoch::DeviceHandle* deviceHandles; + + CUDACHECK(cudaMalloc(&deviceHandles, sizeof(mscclpp::DeviceEpoch::DeviceHandle) * world_size)); + for (int i = 0; i < world_size; ++i) { + auto handle = proxyService.deviceEpochs[i]->deviceHandle(); + CUDACHECK(cudaMemcpy(&deviceHandles[i], &handle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), cudaMemcpyHostToDevice)); + } + + kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles); + CUDACHECK(cudaStreamSynchronize(stream)); + + CUDACHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost)); + + for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { + int val = i + 1; + if (data_h[i] != val) { + printf("oh uh! data_h[%ld] (%d) != val (%d)\n", i, data_h[i], val); + break; + } + } + + bootstrap->barrier(); + + printf("Rank %d succeeded!\n", rank); + +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + MPI_Finalize(); +#endif + return 0; +}