Port python tests to mscclpp.
Please run
`mpirun -tag-output -np 8 pytest ./python/test/test_mscclpp.py -x` to start pytest

---------

Co-authored-by: Saeed Maleki <saemal@microsoft.com>
Co-authored-by: Changho Hwang <changhohwang@microsoft.com>
Co-authored-by: Saeed Maleki <30272783+saeedmaleki@users.noreply.github.com>
This commit is contained in:
Binyang2014
2023-09-01 21:22:11 +08:00
committed by GitHub
parent 3df18d20a3
commit 858e381829
39 changed files with 1186 additions and 18 deletions

View File

@@ -1,14 +1,17 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
find_package(Python 3.8 COMPONENTS Interpreter Development.Module REQUIRED)
include(FetchContent)
FetchContent_Declare(nanobind GIT_REPOSITORY https://github.com/wjakob/nanobind.git GIT_TAG v1.4.0)
FetchContent_MakeAvailable(nanobind)
add_subdirectory(mscclpp)
add_subdirectory(test)
add_custom_target(pylib-copy)
add_custom_command(TARGET pylib-copy POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
${CMAKE_CURRENT_BINARY_DIR}/mscclpp/_mscclpp.cpython-38-x86_64-linux-gnu.so
${CMAKE_CURRENT_SOURCE_DIR}/mscclpp
COMMAND ${CMAKE_COMMAND} -E copy_if_different
${CMAKE_CURRENT_BINARY_DIR}/test/_ext.cpython-38-x86_64-linux-gnu.so
${CMAKE_CURRENT_SOURCE_DIR}/test/_cpp
COMMAND ${CMAKE_COMMAND} -E echo "Copy python libraries"
)
file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cpp)
nanobind_add_module(mscclpp_py ${SOURCES})
set_target_properties(mscclpp_py PROPERTIES OUTPUT_NAME _mscclpp)
target_link_libraries(mscclpp_py PRIVATE mscclpp_static)
target_include_directories(mscclpp_py PRIVATE ${CUDAToolkit_INCLUDE_DIRS})
install(TARGETS mscclpp_py LIBRARY DESTINATION .)

View File

@@ -0,0 +1,14 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
find_package(Python 3.8 COMPONENTS Interpreter Development.Module REQUIRED)
include(FetchContent)
FetchContent_Declare(nanobind GIT_REPOSITORY https://github.com/wjakob/nanobind.git GIT_TAG v1.4.0)
FetchContent_MakeAvailable(nanobind)
file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cpp)
nanobind_add_module(mscclpp_py ${SOURCES})
set_target_properties(mscclpp_py PROPERTIES OUTPUT_NAME _mscclpp)
target_link_libraries(mscclpp_py PRIVATE mscclpp_static)
target_include_directories(mscclpp_py PRIVATE ${CUDAToolkit_INCLUDE_DIRS})
install(TARGETS mscclpp_py LIBRARY DESTINATION .)

View File

@@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
find_package(Python 3.8 COMPONENTS Interpreter Development.Module REQUIRED)
include(FetchContent)
FetchContent_Declare(nanobind GIT_REPOSITORY https://github.com/wjakob/nanobind.git GIT_TAG v1.4.0)
FetchContent_MakeAvailable(nanobind)
file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cpp)
nanobind_add_module(mscclpp_py_test ${SOURCES})
set_target_properties(mscclpp_py_test PROPERTIES OUTPUT_NAME _ext)
target_link_libraries(mscclpp_py_test PRIVATE mscclpp_static)
target_include_directories(mscclpp_py_test PRIVATE ${CUDAToolkit_INCLUDE_DIRS})

0
python/test/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,83 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <cuda.h>
#include <nanobind/nanobind.h>
#include <nanobind/stl/shared_ptr.h>
#include <nanobind/stl/vector.h>
#include <iostream>
#include <memory>
#include <mscclpp/core.hpp>
#include <mscclpp/cuda_utils.hpp>
#include <mscclpp/fifo.hpp>
#include <mscclpp/numa.hpp>
#include <mscclpp/proxy.hpp>
#include <mscclpp/semaphore.hpp>
#include <vector>
namespace nb = nanobind;
class MyProxyService {
private:
int deviceNumaNode_;
int my_rank_, nranks_, dataSize_;
std::vector<std::shared_ptr<mscclpp::Connection>> connections_;
std::vector<std::shared_ptr<mscclpp::RegisteredMemory>> allRegMem_;
std::vector<std::shared_ptr<mscclpp::Host2DeviceSemaphore>> semaphores_;
mscclpp::Proxy proxy_;
public:
MyProxyService(int my_rank, int nranks, int dataSize, std::vector<std::shared_ptr<mscclpp::Connection>> conns,
std::vector<std::shared_ptr<mscclpp::RegisteredMemory>> allRegMem,
std::vector<std::shared_ptr<mscclpp::Host2DeviceSemaphore>> semaphores)
: my_rank_(my_rank),
nranks_(nranks),
dataSize_(dataSize),
connections_(conns),
allRegMem_(allRegMem),
semaphores_(semaphores),
proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) {
int cudaDevice;
cudaGetDevice(&cudaDevice);
deviceNumaNode_ = mscclpp::getDeviceNumaNode(cudaDevice);
}
void bindThread() {
if (deviceNumaNode_ >= 0) {
mscclpp::numaBind(deviceNumaNode_);
}
}
mscclpp::ProxyHandlerResult handleTrigger(mscclpp::ProxyTrigger) {
int dataSizePerRank = dataSize_ / nranks_;
for (int r = 1; r < nranks_; ++r) {
int nghr = (my_rank_ + r) % nranks_;
connections_[nghr]->write(*allRegMem_[nghr], my_rank_ * (uint64_t)dataSizePerRank, *allRegMem_[my_rank_],
my_rank_ * (uint64_t)dataSizePerRank, dataSizePerRank);
semaphores_[nghr]->signal();
connections_[nghr]->flush();
}
return mscclpp::ProxyHandlerResult::FlushFifoTailAndContinue;
}
void start() { proxy_.start(); }
void stop() { proxy_.stop(); }
mscclpp::FifoDeviceHandle fifoDeviceHandle() { return proxy_.fifo().deviceHandle(); }
};
void init_mscclpp_proxy_test_module(nb::module_ &m) {
nb::class_<MyProxyService>(m, "MyProxyService")
.def(nb::init<int, int, int, std::vector<std::shared_ptr<mscclpp::Connection>>,
std::vector<std::shared_ptr<mscclpp::RegisteredMemory>>,
std::vector<std::shared_ptr<mscclpp::Host2DeviceSemaphore>>>(),
nb::arg("rank"), nb::arg("nranks"), nb::arg("data_size"), nb::arg("conn_vec"), nb::arg("reg_mem_vec"),
nb::arg("h2d_sem_vec"))
.def("fifo_device_handle", &MyProxyService::fifoDeviceHandle)
.def("start", &MyProxyService::start)
.def("stop", &MyProxyService::stop);
}
NB_MODULE(_ext, m) { init_mscclpp_proxy_test_module(m); }

View File

@@ -0,0 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <mscclpp/semaphore_device.hpp>
// be careful about using semaphore[my_rank] as it is an invalid semaphore and it is there just for simplicity of
// indexing
extern "C" __global__ void __launch_bounds__(1024, 1)
d2d_semaphore(mscclpp::SmDevice2DeviceSemaphoreDeviceHandle* semaphores, int my_rank, int nranks) {
int tid = threadIdx.x;
if (tid < nranks && tid != my_rank) {
semaphores[tid].signal();
semaphores[tid].wait();
}
}

12
python/test/fifo_test.cu Normal file
View File

@@ -0,0 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <stdio.h>
#include "mscclpp/fifo_device.hpp"
extern "C" __global__ void __launch_bounds__(1024, 1) fifo(mscclpp::FifoDeviceHandle fifo) {
mscclpp::ProxyTrigger trigger;
trigger.fst = 123;
fifo.push(trigger);
}

View File

@@ -0,0 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <mscclpp/semaphore_device.hpp>
// be careful about using semaphore[my_rank] as it is an invalid semaphore and it is there just for simplicity of
// indexing
extern "C" __global__ void __launch_bounds__(1024, 1)
h2d_semaphore(mscclpp::Host2DeviceSemaphoreDeviceHandle* semaphores, int my_rank, int nranks) {
int tid = threadIdx.x;
if (tid < nranks && tid != my_rank) semaphores[tid].wait();
}

View File

@@ -0,0 +1,154 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from __future__ import annotations
import logging
from typing import Type
import cupy as cp
from mscclpp import (
Communicator,
Connection,
Host2DeviceSemaphore,
Host2HostSemaphore,
ProxyService,
RegisteredMemory,
SimpleProxyChannel,
SmChannel,
SmDevice2DeviceSemaphore,
TcpBootstrap,
Transport,
TransportFlags,
)
import numpy as np
from .mscclpp_mpi import MpiGroup
logger = logging.getLogger(__name__)
class MscclppGroup:
def __init__(self, mpi_group: MpiGroup, interfaceIpPortTrio=""):
self.bootstrap = TcpBootstrap.create(mpi_group.comm.rank, mpi_group.comm.size)
if interfaceIpPortTrio == "":
uniq_id = None
if mpi_group.comm.rank == 0:
# similar to NCCL's unique id
uniq_id = self.bootstrap.create_unique_id()
uniq_id_global = mpi_group.comm.bcast(uniq_id, 0)
self.bootstrap.initialize(uniq_id_global)
else:
# use this instead
self.bootstrap.initialize(interfaceIpPortTrio)
self.communicator = Communicator(self.bootstrap)
self.my_rank = self.bootstrap.get_rank()
self.nranks = self.bootstrap.get_n_ranks()
def barrier(self):
self.bootstrap.barrier()
def send(self, tensor: np.ndarray, peer: int, tag: int):
self.bootstrap.send(tensor.ctypes.data, tensor.size * tensor.itemsize, peer, tag)
def recv(self, tensor: np.ndarray, peer: int, tag: int):
self.bootstrap.recv(tensor.ctypes.data, tensor.size * tensor.itemsize, peer, tag)
def my_ib_device(self, local_rank: int) -> Transport:
if local_rank == 0:
return Transport.IB0
if local_rank == 1:
return Transport.IB1
if local_rank == 2:
return Transport.IB2
if local_rank == 3:
return Transport.IB3
if local_rank == 4:
return Transport.IB4
if local_rank == 5:
return Transport.IB5
if local_rank == 6:
return Transport.IB6
if local_rank == 7:
return Transport.IB7
else:
assert False # only 8 IBs are supported
def make_connection(self, remote_ranks: list[int], transport: Transport) -> dict[int, Connection]:
connections = {}
for rank in remote_ranks:
connections[rank] = self.communicator.connect_on_setup(rank, 0, transport)
self.communicator.setup()
return connections
def register_tensor_with_connections(
self, tensor: Type[cp.ndarray] or Type[np.ndarray], connections: dict[int, Connection]
) -> dict[int, RegisteredMemory]:
transport_flags = TransportFlags()
for rank in connections:
transport_flags |= connections[rank].transport()
data_ptr = tensor.data.ptr if isinstance(tensor, cp.ndarray) else tensor.ctypes.data
local_reg_memory = self.communicator.register_memory(data_ptr, tensor.size * tensor.itemsize, transport_flags)
all_registered_memories = {}
all_registered_memories[self.my_rank] = local_reg_memory
future_memories = {}
for rank in connections:
self.communicator.send_memory_on_setup(local_reg_memory, rank, 0)
future_memories[rank] = self.communicator.recv_memory_on_setup(rank, 0)
self.communicator.setup()
for rank in connections:
all_registered_memories[rank] = future_memories[rank].get()
return all_registered_memories
def make_semaphore(
self,
connections: dict[int, Connection],
semaphore_type: Type[Host2HostSemaphore] or Type[Host2DeviceSemaphore] or Type[SmDevice2DeviceSemaphore],
) -> dict[int, Host2HostSemaphore]:
semaphores = {}
for rank in connections:
semaphores[rank] = semaphore_type(self.communicator, connections[rank])
self.communicator.setup()
return semaphores
def make_sm_channels(self, tensor: cp.ndarray, connections: dict[int, Connection]) -> dict[int, SmChannel]:
semaphores = self.make_semaphore(connections, SmDevice2DeviceSemaphore)
registered_memories = self.register_tensor_with_connections(tensor, connections)
channels = {}
for rank in connections:
channels[rank] = SmChannel(semaphores[rank], registered_memories[rank], tensor.data.ptr)
return channels
def make_sm_channels_with_packet(
self, tensor: cp.ndarray, packetTensor: cp.ndarray, connections: dict[int, Connection]
) -> dict[int, SmChannel]:
semaphores = self.make_semaphore(connections, SmDevice2DeviceSemaphore)
registered_memories = self.register_tensor_with_connections(packetTensor, connections)
channels = {}
for rank in connections:
channels[rank] = SmChannel(
semaphores[rank],
registered_memories[rank],
tensor.data.ptr,
packetTensor.data.ptr,
)
return channels
def make_proxy_channels_with_packet(
self, proxy_service: ProxyService, tensor: cp.ndarray, connections: dict[int, Connection]
) -> dict[int, SmChannel]:
semaphores = self.make_semaphore(connections, Host2DeviceSemaphore)
registered_memories = self.register_tensor_with_connections(tensor, connections)
memory_ids = {}
semaphore_ids = {}
for rank in registered_memories:
memory_ids[rank] = proxy_service.add_memory(registered_memories[rank])
for rank in semaphores:
semaphore_ids[rank] = proxy_service.add_semaphore(semaphores[rank])
channels = {}
for rank in semaphores:
channels[rank] = SimpleProxyChannel(
proxy_service.proxy_channel(semaphore_ids[rank]),
memory_ids[rank],
memory_ids[self.my_rank],
)
return channels

View File

@@ -0,0 +1,72 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import atexit
import logging
import cupy as cp
import mpi4py
mpi4py.rc.initialize = False
mpi4py.rc.finalize = False
from mpi4py import MPI
import pytest
N_GPUS_PER_NODE = 8
logging.basicConfig(level=logging.INFO)
def init_mpi():
if not MPI.Is_initialized():
MPI.Init()
shm_comm = MPI.COMM_WORLD.Split_type(MPI.COMM_TYPE_SHARED, 0, MPI.INFO_NULL)
N_GPUS_PER_NODE = shm_comm.size
shm_comm.Free()
cp.cuda.Device(MPI.COMM_WORLD.rank % N_GPUS_PER_NODE).use()
# Define a function to finalize MPI
def finalize_mpi():
if MPI.Is_initialized():
MPI.Finalize()
# Register the function to be called on exit
atexit.register(finalize_mpi)
class MpiGroup:
def __init__(self, ranks: list):
world_group = MPI.COMM_WORLD.group
group = world_group.Incl(ranks)
self.comm = MPI.COMM_WORLD.Create(group)
@pytest.fixture
def mpi_group(request: pytest.FixtureRequest):
MPI.COMM_WORLD.barrier()
if request.param is None:
pytest.skip(f"Skip for rank {MPI.COMM_WORLD.rank}")
yield request.param
def parametrize_mpi_groups(*tuples: tuple):
def decorator(func):
mpi_groups = []
for group_size in list(tuples):
if MPI.COMM_WORLD.size < group_size:
logging.warning(f"MPI.COMM_WORLD.size < {group_size}, skip")
continue
mpi_group = MpiGroup(list(range(group_size)))
if mpi_group.comm == MPI.COMM_NULL:
mpi_groups.append(None)
else:
mpi_groups.append(mpi_group)
return pytest.mark.parametrize("mpi_group", mpi_groups, indirect=True)(func)
return decorator
init_mpi()

22
python/test/proxy_test.cu Normal file
View File

@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <mscclpp/fifo_device.hpp>
#include <mscclpp/semaphore_device.hpp>
extern "C" __global__ void __launch_bounds__(1024, 1) proxy(int my_rank, int nranks, mscclpp::FifoDeviceHandle fifo,
mscclpp::Host2DeviceSemaphoreDeviceHandle* semaphores) {
int tid = threadIdx.x;
if (tid == 0) {
mscclpp::ProxyTrigger trigger;
trigger.fst = 123;
trigger.snd = 0;
uint64_t currentFifoHead = fifo.push(trigger);
// wait for the work to be done in cpu side
fifo.sync(currentFifoHead);
}
__syncthreads();
if (tid < nranks && tid != my_rank) {
semaphores[tid].wait();
}
}

View File

@@ -0,0 +1,6 @@
cuda-python==12.1.0
mpi4py==3.1.4
netifaces==0.11.0
numpy==1.22.2
pytest==7.2.2
cupy-cuda11x

View File

@@ -0,0 +1,6 @@
cuda-python==12.1.0
mpi4py==3.1.4
netifaces==0.11.0
numpy==1.22.2
pytest==7.2.2
cupy-cuda12x

View File

@@ -0,0 +1,35 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <mscclpp/packet.hpp>
#include <mscclpp/proxy_channel_device.hpp>
// be careful about using channels[my_rank] as it is inavlie and it is there just for simplicity of indexing
extern "C" __global__ void __launch_bounds__(1024, 1)
simple_proxy_channel(mscclpp::SimpleProxyChannelDeviceHandle* channels, int my_rank, int nranks, int* data,
int* scratch, int num_elements, int use_packet) {
int tid = threadIdx.x;
int nthreads = blockDim.x;
uint64_t size_per_rank = (num_elements * sizeof(int)) / nranks;
uint64_t my_offset = size_per_rank * my_rank;
int nthreads_per_rank = nthreads / nranks;
int my_nghr = tid / nthreads_per_rank;
uint64_t my_nghr_offset = size_per_rank * my_nghr;
__syncthreads();
int flag = 123;
if (use_packet) {
mscclpp::putPackets(scratch, 2 * my_offset, data, my_offset, size_per_rank, tid, nthreads, flag);
__syncthreads();
if (tid < nranks && tid != my_rank) {
channels[tid].put(2 * my_offset, 2 * my_offset, 2 * size_per_rank);
}
if (my_nghr != my_rank && my_nghr < nranks)
mscclpp::getPackets(data, my_nghr_offset, scratch, 2 * my_nghr_offset, size_per_rank, tid % nthreads_per_rank,
nthreads_per_rank, flag);
} else {
if (tid < nranks && tid != my_rank) {
channels[tid].putWithSignalAndFlush(my_offset, my_offset, size_per_rank);
channels[tid].wait();
}
}
}

View File

@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <mscclpp/sm_channel_device.hpp>
// be careful about using channels[my_rank] as it is inavlie and it is there just for simplicity of indexing
extern "C" __global__ void __launch_bounds__(1024, 1)
sm_channel(mscclpp::SmChannelDeviceHandle* channels, int my_rank, int nranks, int num_elements, int use_packet) {
int tid = threadIdx.x;
int bid = blockIdx.x;
uint64_t size_per_rank = (num_elements * sizeof(int)) / nranks;
uint64_t my_offset = size_per_rank * my_rank;
uint64_t my_nghr_offset = size_per_rank * bid;
int flag = 123;
if (bid < nranks && bid != my_rank) {
if (use_packet) {
channels[bid].putPackets(2 * my_offset, my_offset, size_per_rank, tid, blockDim.x, flag);
channels[bid].getPackets(my_nghr_offset, 2 * my_nghr_offset, size_per_rank, tid, blockDim.x, flag);
} else {
channels[bid].put(my_offset, my_offset, size_per_rank, tid, blockDim.x);
__syncthreads();
if (!use_packet && tid == 0) {
channels[bid].signal();
channels[bid].wait();
}
}
}
}

478
python/test/test_mscclpp.py Normal file
View File

@@ -0,0 +1,478 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from concurrent.futures import ThreadPoolExecutor
import time
import cupy as cp
import numpy as np
import netifaces as ni
import pytest
from mscclpp import (
Fifo,
Host2DeviceSemaphore,
Host2HostSemaphore,
ProxyService,
SmDevice2DeviceSemaphore,
Transport,
)
from ._cpp import _ext
from .mscclpp_group import MscclppGroup
from .mscclpp_mpi import MpiGroup, parametrize_mpi_groups, mpi_group
from .utils import KernelBuilder, pack
ethernet_interface_name = "eth0"
def all_ranks_on_the_same_node(mpi_group: MpiGroup):
if (ethernet_interface_name in ni.interfaces()) is False:
pytest.skip(f"{ethernet_interface_name} is not an interface to use on this node")
my_ip = ni.ifaddresses(ethernet_interface_name)[ni.AF_INET][0]["addr"]
root_ip = mpi_group.comm.bcast(my_ip, 0)
last_rank_ip = mpi_group.comm.bcast(my_ip, mpi_group.comm.size - 1)
return last_rank_ip == root_ip
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("ifIpPortTrio", ["eth0:localhost:50000", ethernet_interface_name, ""])
def test_group_with_ip(mpi_group: MpiGroup, ifIpPortTrio: str):
if (ethernet_interface_name in ni.interfaces()) is False:
pytest.skip(f"{ethernet_interface_name} is not an interface to use on this node")
my_ip = ni.ifaddresses(ethernet_interface_name)[ni.AF_INET][0]["addr"]
root_ip = mpi_group.comm.bcast(my_ip, 0)
if ifIpPortTrio == ethernet_interface_name:
ifIpPortTrio += ":" + root_ip + ":50000" # some random port
if all_ranks_on_the_same_node(mpi_group) is False and "localhost" in ifIpPortTrio:
# ranks are on different nodes
pytest.skip("this case is not supported as localhost will be different for different nodes")
group = MscclppGroup(mpi_group, ifIpPortTrio)
nelem = 1024
memory = np.zeros(nelem, dtype=np.int32)
nelemPerRank = nelem // group.nranks
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = np.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
for rank in range(group.nranks):
if rank == group.my_rank:
continue
group.send(
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))],
rank,
0,
)
for rank in range(group.nranks):
if rank == group.my_rank:
continue
group.recv(memory[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))], rank, 0)
assert np.array_equal(memory, memory_expected)
def create_and_connect(mpi_group: MpiGroup, transport: str):
if transport == "NVLink" and all_ranks_on_the_same_node(mpi_group) is False:
pytest.skip("cannot use nvlink for cross node")
group = MscclppGroup(mpi_group)
remote_nghrs = list(range(mpi_group.comm.size))
remote_nghrs.remove(mpi_group.comm.rank)
if transport == "NVLink":
tran = Transport.CudaIpc
elif transport == "IB":
tran = group.my_ib_device(group.my_rank % 8)
else:
assert False
connections = group.make_connection(remote_nghrs, tran)
return group, connections
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("transport", ["IB", "NVLink"])
def test_group_with_connections(mpi_group: MpiGroup, transport: str):
create_and_connect(mpi_group, transport)
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("transport", ["IB", "NVLink"])
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
def test_connection_write(mpi_group: MpiGroup, transport: Transport, nelem: int):
group, connections = create_and_connect(mpi_group, transport)
memory = cp.zeros(nelem, dtype=cp.int32)
nelemPerRank = nelem // group.nranks
sizePerRank = nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
group.barrier()
all_reg_memories = group.register_tensor_with_connections(memory, connections)
for rank in connections:
connections[rank].write(
all_reg_memories[rank],
sizePerRank * group.my_rank,
all_reg_memories[group.my_rank],
sizePerRank * group.my_rank,
sizePerRank,
)
poll_for = 100
for i in range(poll_for):
all_correct = cp.array_equal(memory, memory_expected)
if all_correct:
break
time.sleep(0.1)
for conn in connections:
connections[conn].flush()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
assert all_correct
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("transport", ["IB", "NVLink"])
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@pytest.mark.parametrize("device", ["cuda", "cpu"])
def test_connection_write_and_signal(mpi_group: MpiGroup, transport: Transport, nelem: int, device: str):
# this test starts with a random tensor on rank 0 and rotates it all the way through all ranks
# and finally, comes back to rank 0 to make sure it matches all the original values
if device == "cpu" and transport == "NVLink":
pytest.skip("nvlink doesn't work with host allocated memory")
group, connections = create_and_connect(mpi_group, transport)
xp = cp if device == "cuda" else np
if group.my_rank == 0:
memory = xp.random.randn(nelem)
memory = memory.astype(xp.float32)
memory_expected = memory.copy()
else:
memory = xp.zeros(nelem, dtype=xp.float32)
signal_memory = xp.zeros(1, dtype=xp.int64)
all_reg_memories = group.register_tensor_with_connections(memory, connections)
all_signal_memories = group.register_tensor_with_connections(signal_memory, connections)
next_rank = (group.my_rank + 1) % group.nranks
bufferSize = nelem * memory.itemsize
dummy_memory_on_cpu = np.zeros(1, dtype=np.int64)
signal_val = 123
if group.my_rank != 0:
while signal_memory[0] != signal_val:
time.sleep(0.1)
connections[next_rank].write(all_reg_memories[next_rank], 0, all_reg_memories[group.my_rank], 0, bufferSize)
connections[next_rank].flush()
if group.my_rank == 0:
memory[:] = 0
connections[next_rank].update_and_sync(
all_signal_memories[next_rank], 0, dummy_memory_on_cpu.ctypes.data, signal_val
)
all_correct = False
if group.my_rank == 0:
while signal_memory[0] != signal_val:
time.sleep(0.1)
all_correct = cp.array_equal(memory, memory_expected)
group.barrier()
all_correct = mpi_group.comm.bcast(all_correct, 0)
assert all_correct
@parametrize_mpi_groups(2, 4, 8, 16)
def test_h2h_semaphores(mpi_group: MpiGroup):
group, connections = create_and_connect(mpi_group, "IB")
semaphores = group.make_semaphore(connections, Host2HostSemaphore)
for rank in connections:
semaphores[rank].signal()
for rank in connections:
semaphores[rank].wait()
group.barrier()
class MscclppKernel:
def __init__(
self,
test_name,
my_rank=None,
nranks=None,
semaphore_or_channels=None,
tensor=None,
use_packet=False,
scratch=None,
fifo=None,
):
if test_name == "h2d_semaphore":
self._kernel = KernelBuilder(
file="h2d_semaphore_test.cu",
kernel_name="h2d_semaphore",
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "d2d_semaphore":
self._kernel = KernelBuilder(
file="d2d_semaphore_test.cu",
kernel_name="d2d_semaphore",
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "sm_channel":
self._kernel = KernelBuilder(
file="sm_channel_test.cu",
kernel_name="sm_channel",
).get_compiled_kernel()
self.nblocks = nranks
self.nthreads = 1024
elif test_name == "fifo":
self._kernel = KernelBuilder(
file="fifo_test.cu",
kernel_name="fifo",
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = 1
elif test_name == "proxy":
self._kernel = KernelBuilder(
file="proxy_test.cu",
kernel_name="proxy",
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "simple_proxy_channel":
self._kernel = KernelBuilder(
file="simple_proxy_channel_test.cu",
kernel_name="simple_proxy_channel",
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = 1024
else:
assert False
self.params = b""
if test_name in ["h2d_semaphore", "d2d_semaphore", "sm_channel", "simple_proxy_channel"]:
first_arg = next(iter(semaphore_or_channels.values()))
size_of_semaphore_or_channels = len(first_arg.device_handle().raw)
device_handles = []
for rank in range(nranks):
if rank == my_rank:
device_handles.append(
bytes(size_of_semaphore_or_channels)
) # just zeros for semaphores that do not exist
else:
device_handles.append(semaphore_or_channels[rank].device_handle().raw)
# keep a reference to the device handles so that they don't get garbage collected
self._d_semaphore_or_channels = cp.asarray(memoryview(b"".join(device_handles)), dtype=cp.uint8)
self.params += pack(self._d_semaphore_or_channels, my_rank, nranks)
if test_name == "sm_channel":
self.params += pack(tensor.size, use_packet)
if test_name == "simple_proxy_channel":
self.params += pack(tensor, scratch, tensor.size, use_packet)
elif test_name == "fifo":
self.params = fifo.device_handle().raw
elif test_name == "proxy":
semaphore_device_handles = [semaphore.device_handle().raw for semaphore in semaphore_or_channels]
self._d_semaphore_or_channels = cp.asarray(memoryview(b"".join(semaphore_device_handles)), dtype=cp.uint8)
self.params = pack(my_rank, nranks) + fifo.raw + pack(self._d_semaphore_or_channels)
def __call__(self):
return self._kernel.launch_kernel(self.params, self.nblocks, self.nthreads, 0, None)
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("transport", ["NVLink", "IB"])
def test_h2d_semaphores(mpi_group: MpiGroup, transport: str):
def signal(semaphores):
for rank in semaphores:
semaphores[rank].signal()
group, connections = create_and_connect(mpi_group, transport)
semaphores = group.make_semaphore(connections, Host2DeviceSemaphore)
kernel = MscclppKernel("h2d_semaphore", group.my_rank, group.nranks, semaphores)
kernel()
# workaround: use a separate thread to to let cudaMemcpyAsync run concurrently with the kernel
with ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(signal, semaphores)
cp.cuda.runtime.deviceSynchronize()
group.barrier()
@parametrize_mpi_groups(2, 4, 8, 16)
def test_d2d_semaphores(mpi_group: MpiGroup):
group, connections = create_and_connect(mpi_group, "NVLink")
semaphores = group.make_semaphore(connections, SmDevice2DeviceSemaphore)
group.barrier()
kernel = MscclppKernel("d2d_semaphore", group.my_rank, group.nranks, semaphores)
kernel()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@pytest.mark.parametrize("use_packet", [False, True])
def test_sm_channels(mpi_group: MpiGroup, nelem: int, use_packet: bool):
group, connections = create_and_connect(mpi_group, "NVLink")
memory = cp.zeros(nelem, dtype=cp.int32)
if use_packet:
scratch = cp.zeros(nelem * 2, dtype=cp.int32)
else:
scratch = None
nelemPerRank = nelem // group.nranks
nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
if use_packet:
channels = group.make_sm_channels_with_packet(memory, scratch, connections)
else:
channels = group.make_sm_channels(memory, connections)
kernel = MscclppKernel("sm_channel", group.my_rank, group.nranks, channels, memory, use_packet, scratch)
group.barrier()
kernel()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
assert cp.array_equal(memory, memory_expected)
@parametrize_mpi_groups(2, 4, 8, 16)
def test_fifo(
mpi_group: MpiGroup,
):
fifo = Fifo()
kernel = MscclppKernel("fifo", fifo=fifo)
kernel()
poll_for = 100
for _ in range(poll_for):
trigger = fifo.poll()
if trigger.fst == 123:
return
time.sleep(0.1)
assert False
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@pytest.mark.parametrize("transport", ["IB", "NVLink"])
def test_proxy(
mpi_group: MpiGroup,
nelem: int,
transport: str,
):
group, connections = create_and_connect(mpi_group, transport)
memory = cp.zeros(
nelem,
dtype=cp.int32,
)
nelemPerRank = nelem // group.nranks
nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
group.barrier()
all_reg_memories = group.register_tensor_with_connections(memory, connections)
semaphores = group.make_semaphore(connections, Host2DeviceSemaphore)
list_conn = []
list_sem = []
list_reg_mem = []
first_conn = next(iter(connections.values()))
first_sem = next(iter(semaphores.values()))
for rank in range(group.nranks):
if rank in connections:
list_conn.append(connections[rank])
list_sem.append(semaphores[rank])
else:
list_conn.append(first_conn) # just for simplicity of indexing
list_sem.append(first_sem)
list_reg_mem.append(all_reg_memories[rank])
proxy = _ext.MyProxyService(
group.my_rank,
group.nranks,
nelem * memory.itemsize,
list_conn,
list_reg_mem,
list_sem,
)
fifo_device_handle = proxy.fifo_device_handle()
kernel = MscclppKernel(
"proxy",
my_rank=group.my_rank,
nranks=group.nranks,
semaphore_or_channels=list_sem,
fifo=fifo_device_handle,
)
proxy.start()
group.barrier()
kernel()
cp.cuda.runtime.deviceSynchronize()
proxy.stop()
group.barrier()
assert cp.array_equal(memory, memory_expected)
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@pytest.mark.parametrize("transport", ["NVLink", "IB"])
@pytest.mark.parametrize("use_packet", [False, True])
def test_simple_proxy_channel(
mpi_group: MpiGroup,
nelem: int,
transport: str,
use_packet: bool,
):
group, connections = create_and_connect(mpi_group, transport)
memory = cp.zeros(nelem, dtype=cp.int32)
if use_packet:
scratch = cp.zeros(nelem * 2, dtype=cp.int32)
else:
scratch = cp.zeros(1, dtype=cp.int32) # just so that we can pass a valid ptr
nelemPerRank = nelem // group.nranks
nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
group.barrier()
proxy_service = ProxyService()
if use_packet:
memory_to_register = scratch
else:
memory_to_register = memory
simple_channels = group.make_proxy_channels_with_packet(proxy_service, memory_to_register, connections)
kernel = MscclppKernel(
"simple_proxy_channel",
my_rank=group.my_rank,
nranks=group.nranks,
semaphore_or_channels=simple_channels,
tensor=memory,
use_packet=use_packet,
scratch=scratch,
)
proxy_service.start_proxy()
group.barrier()
kernel()
cp.cuda.runtime.deviceSynchronize()
proxy_service.stop_proxy()
group.barrier()
assert cp.array_equal(memory, memory_expected)

122
python/test/utils.py Normal file
View File

@@ -0,0 +1,122 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import ctypes
import os
import struct
import subprocess
import tempfile
from typing import Type
from cuda import cuda, nvrtc, cudart
import cupy as cp
import numpy as np
def _check_cuda_errors(result):
if result[0].value:
raise RuntimeError(f"CUDA error code={result[0].value}({_cuda_get_error(result[0])})")
if len(result) == 1:
return None
elif len(result) == 2:
return result[1]
else:
return result[1:]
def _cuda_get_error(error):
if isinstance(error, cuda.CUresult):
err, name = cuda.cuGetErrorName(error)
return name if err == cuda.CUresult.CUDA_SUCCESS else "<unknown>"
elif isinstance(error, cudart.cudaError_t):
return cudart.cudaGetErrorName(error)[1]
elif isinstance(error, nvrtc.nvrtcResult):
return nvrtc.nvrtcGetErrorString(error)[1]
else:
raise RuntimeError("Unknown error type: {}".format(error))
class Kernel:
def __init__(self, ptx: bytes, kernel_name: str, device_id: int):
self._context = _check_cuda_errors(cuda.cuCtxGetCurrent())
assert self._context is not None
self._module = _check_cuda_errors(cuda.cuModuleLoadData(ptx))
self._kernel = _check_cuda_errors(cuda.cuModuleGetFunction(self._module, kernel_name.encode()))
def launch_kernel(
self,
params: bytes,
nblocks: int,
nthreads: int,
shared: int,
stream: Type[cuda.CUstream] or Type[cudart.cudaStream_t],
):
buffer = (ctypes.c_byte * len(params)).from_buffer_copy(params)
buffer_size = ctypes.c_size_t(len(params))
config = np.array(
[
cuda.CU_LAUNCH_PARAM_BUFFER_POINTER,
ctypes.addressof(buffer),
cuda.CU_LAUNCH_PARAM_BUFFER_SIZE,
ctypes.addressof(buffer_size),
cuda.CU_LAUNCH_PARAM_END,
],
dtype=np.uint64,
)
_check_cuda_errors(
cuda.cuLaunchKernel(self._kernel, nblocks, 1, 1, nthreads, 1, 1, shared, stream, 0, config.ctypes.data)
)
def __del__(self):
cuda.cuModuleUnload(self._module)
class KernelBuilder:
def __init__(self, file: str, kernel_name: str):
self._tempdir = tempfile.TemporaryDirectory()
self._current_file_dir = os.path.dirname(os.path.abspath(__file__))
device_id = cp.cuda.Device().id
ptx = self._compile_cuda(os.path.join(self._current_file_dir, file), f"{kernel_name}.ptx", device_id)
self._kernel = Kernel(ptx, kernel_name, device_id)
def _compile_cuda(self, source_file, output_file, device_id, std_version="c++17"):
include_dir = os.path.join(self._current_file_dir, "../../include")
major = _check_cuda_errors(
cudart.cudaDeviceGetAttribute(cudart.cudaDeviceAttr.cudaDevAttrComputeCapabilityMajor, device_id)
)
minor = _check_cuda_errors(
cudart.cudaDeviceGetAttribute(cudart.cudaDeviceAttr.cudaDevAttrComputeCapabilityMinor, device_id)
)
command = (
f"nvcc -std={std_version} -ptx -Xcompiler -Wall,-Wextra -I{include_dir} {source_file} "
f"--gpu-architecture=compute_{major}{minor} --gpu-code=sm_{major}{minor},compute_{major}{minor} -o {self._tempdir.name}/{output_file}"
)
try:
subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
with open(f"{self._tempdir.name}/{output_file}", "rb") as f:
return f.read()
except subprocess.CalledProcessError as e:
raise RuntimeError("Compilation failed:", e.stderr.decode(), command)
def get_compiled_kernel(self):
return self._kernel
def __del__(self):
self._tempdir.cleanup()
def pack(*args):
res = b""
for arg in list(args):
if isinstance(arg, int):
res += struct.pack("i", arg)
elif isinstance(arg, np.ndarray):
res += struct.pack("P", arg.ctypes.data)
elif isinstance(arg, cp.ndarray):
res += struct.pack("P", arg.data.ptr)
# use int to represent bool, which can avoid CUDA_ERROR_LAUNCH_OUT_OF_RESOURCES error
elif isinstance(arg, bool):
res += struct.pack("i", arg)
else:
raise RuntimeError(f"Unsupported type: {type(arg)}")
return res