NVLS support for msccl++ executor (#375)

- Support mote datatype for multicast operation
- Add new OP MULTI_LOAD_REDUCE_STORE to support NVLS
- Modify allocSharedPhysicalCuda, which return std::shared_ptr<T>
instead of std::shared_ptr<PhysicalCudaMemory>
- Add Python support for allocSharedPhysicalCuda

Test passed for `allreduce_nvls.json`
This commit is contained in:
Binyang Li
2024-11-19 22:43:28 -08:00
committed by GitHub
parent 3e51e9b359
commit 28a57b0610
26 changed files with 2116 additions and 212 deletions

View File

@@ -26,6 +26,7 @@ from ._mscclpp import (
PacketType,
version,
is_nvls_supported,
alloc_shared_physical_cuda,
npkit,
)

View File

@@ -23,6 +23,7 @@ extern void register_numa(nb::module_& m);
extern void register_nvls(nb::module_& m);
extern void register_executor(nb::module_& m);
extern void register_npkit(nb::module_& m);
extern void register_gpu_utils(nb::module_& m);
template <typename T>
void def_nonblocking_future(nb::handle& m, const std::string& typestr) {
@@ -194,4 +195,5 @@ NB_MODULE(_mscclpp, m) {
register_nvls(m);
register_executor(m);
register_npkit(m);
register_gpu_utils(m);
}

View File

@@ -0,0 +1,30 @@
#include <nanobind/nanobind.h>
#include <nanobind/stl/shared_ptr.h>
// #include <memory>
#include <mscclpp/gpu_data_types.hpp>
#include <mscclpp/gpu_utils.hpp>
namespace nb = nanobind;
using namespace mscclpp;
class PyCudaMemory {
public:
PyCudaMemory(size_t size) : size_(size) { ptr_ = allocSharedPhysicalCuda<char>(size); }
uintptr_t getPtr() const { return (uintptr_t)(ptr_.get()); }
size_t size() const { return size_; }
private:
std::shared_ptr<char> ptr_;
size_t size_;
};
void register_gpu_utils(nb::module_& m) {
nb::class_<PyCudaMemory>(m, "PyCudaMemory")
.def(nb::init<size_t>(), nb::arg("size"))
.def("get_ptr", &PyCudaMemory::getPtr, "Get the raw pointer")
.def("size", &PyCudaMemory::size, "Get the size of the allocated memory");
m.def(
"alloc_shared_physical_cuda", [](size_t size) { return std::make_shared<PyCudaMemory>(size); }, nb::arg("size"));
}

View File

@@ -30,7 +30,7 @@ void register_nvls(nb::module_& m) {
});
nb::class_<NvlsConnection>(m, "NvlsConnection")
.def("allocate_bind_memory", &NvlsConnection::allocateAndBindCuda)
.def("bind_allocated_memory", &NvlsConnection::bindAllocatedMemory, nb::arg("devicePtr"), nb::arg("size"))
.def("get_multicast_min_granularity", &NvlsConnection::getMultiCastMinGranularity);
m.def("connect_nvls_collective", &connectNvlsCollective, nb::arg("communicator"), nb::arg("allRanks"),

View File

@@ -1,7 +1,7 @@
import os
import cupy as cp
import ctypes
from mscclpp import Transport, ProxyService, SmDevice2DeviceSemaphore
from mscclpp import Transport, ProxyService, SmDevice2DeviceSemaphore, alloc_shared_physical_cuda
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, pack
@@ -443,12 +443,15 @@ class MscclppAllReduce6:
self.nvls_connection = group.make_connection(all_ranks, Transport.Nvls)
min_gran = self.nvls_connection.get_multicast_min_granularity()
aligned_buffer_size = int(((buffer_size + min_gran - 1) // min_gran) * min_gran)
self.nvls_mem_handle = self.nvls_connection.allocate_bind_memory(
aligned_buffer_size
buffer_raw = alloc_shared_physical_cuda(aligned_buffer_size)
self.nvls_mem_handle = self.nvls_connection.bind_allocated_memory(
buffer_raw.get_ptr(), aligned_buffer_size
) # just using recommended size for now
self.memory_ptr = self.nvls_mem_handle.get_device_ptr()
self.cp_memory_ptr = cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(self.memory_ptr, aligned_buffer_size, None), 0)
self.cp_memory_ptr = cp.cuda.MemoryPointer(
cp.cuda.UnownedMemory(self.memory_ptr, aligned_buffer_size, buffer_raw), 0
)
self.memory = cp.ndarray(nelem, memory_dtype, self.cp_memory_ptr)
# create a sm_channel for each remote neighbor

View File

@@ -8,6 +8,8 @@ from mscclpp import (
ExecutionPlan,
PacketType,
npkit,
alloc_shared_physical_cuda,
is_nvls_supported,
)
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, pack
@@ -125,6 +127,18 @@ def dtype_to_mscclpp_dtype(dtype):
raise ValueError(f"Unknown data type: {dtype}")
def allocate_buffer(nelems, dtype):
if is_nvls_supported:
buffer_raw = alloc_shared_physical_cuda(nelems * cp.dtype(dtype).itemsize)
buffer_ptr = cp.cuda.MemoryPointer(
cp.cuda.UnownedMemory(buffer_raw.get_ptr(), buffer_raw.size(), buffer_raw), 0
)
buffer = cp.ndarray(nelems, dtype=dtype, memptr=buffer_ptr)
return buffer
else:
return cp.zeros(nelems, dtype=dtype)
def build_bufs(
execution_plan_name: str,
size: int,
@@ -144,14 +158,14 @@ def build_bufs(
nelems_input = nelems
nelems_output = nelems
result_buf = cp.zeros(nelems_output, dtype=dtype)
result_buf = allocate_buffer(nelems_output, dtype=dtype)
if in_place:
if "allgather" in execution_plan_name:
input_buf = cp.split(result_buf, num_ranks)[rank]
else:
input_buf = result_buf
else:
input_buf = cp.zeros(nelems_input, dtype=dtype)
input_buf = allocate_buffer(nelems_input, dtype=dtype)
test_buf = cp.zeros(nelems_output, dtype=dtype)
return input_buf, result_buf, test_buf