Files
mscclpp/python/test/test_mscclpp.py
Binyang Li 184dcbf9d7 Add CI pipeline for no-IB environment testing (#755)
## Summary

Add CI pipeline support for testing in environments without InfiniBand
(IB) hardware.

## Changes

### IB stubs for no-IB builds (`src/core/ib.cc`)
- Added stub implementations for `IbMr` and `IbQp` classes in the `#else
// !defined(USE_IBVERBS)` block so the library links successfully when
built with `-DMSCCLPP_USE_IB=OFF`.

### Environment variable to disable IB tests
(`MSCCLPP_DISABLE_IB_TESTS`)
- Added `disableIbTests` field to the `Env` class
(`include/mscclpp/env.hpp`, `src/core/env.cpp`), reading from
`MSCCLPP_DISABLE_IB_TESTS` env var.
- Exposed as `disable_ib_tests` in Python bindings
(`python/csrc/env_py.cpp`).
- Updated `python/test/test_mscclpp.py` to skip IB-dependent tests
(`create_group_and_connection` with IB transport, `test_h2h_semaphores`,
`test_h2h_semaphores_gil_release`) when `env().disable_ib_tests` is
true.

### CI pipeline (`ut-no-ib-env.yaml`, `ut.yml`)
The no-IB environment pipeline runs two phases:

1. **No-IB build phase**: Build with `-DMSCCLPP_USE_IB=OFF`, deploy, run
unit tests, multi-process unit tests, and pytests (with
`MSCCLPP_DISABLE_IB_TESTS=1`).
2. **IB build phase**: Rebuild with IB enabled (default), stop the
existing container, redeploy, and run pytests (with
`MSCCLPP_DISABLE_IB_TESTS=1`) — verifying that the full IB-enabled build
works correctly in a non-IB environment when IB tests are skipped.

Also increased the job timeout from 40 to 60 minutes to accommodate the
two-phase pipeline.
2026-02-24 15:55:59 -08:00

700 lines
26 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from concurrent.futures import ThreadPoolExecutor
import os
import time
import threading
import cupy as cp
import numpy as np
import netifaces as ni
import pytest
from mscclpp import (
ErrorCode,
DataType,
EndpointConfig,
ExecutionPlan,
Executor,
Fifo,
Host2DeviceSemaphore,
Host2HostSemaphore,
ProxyService,
MemoryDevice2DeviceSemaphore,
TcpBootstrap,
Transport,
is_nvls_supported,
npkit,
env,
Device,
DeviceType,
)
from mscclpp import CommGroup, GpuBuffer
from mscclpp.utils import KernelBuilder, pack
from ._cpp import _ext
from .mscclpp_mpi import MpiGroup, parametrize_mpi_groups, mpi_group
ethernet_interface_name = "eth0"
@parametrize_mpi_groups(1)
def test_env(mpi_group: MpiGroup):
e = env()
assert isinstance(e.debug, str)
with pytest.raises(AttributeError):
# all attributes should be read-only
e.debug = "INFO"
# should be the same object
e2 = env()
assert e == e2
def all_ranks_on_the_same_node(mpi_group: MpiGroup):
if (ethernet_interface_name in ni.interfaces()) is False:
pytest.skip(f"{ethernet_interface_name} is not an interface to use on this node")
my_ip = ni.ifaddresses(ethernet_interface_name)[ni.AF_INET][0]["addr"]
root_ip = mpi_group.comm.bcast(my_ip, 0)
last_rank_ip = mpi_group.comm.bcast(my_ip, mpi_group.comm.size - 1)
return last_rank_ip == root_ip
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("ifIpPortTrio", [f"{ethernet_interface_name}:localhost:50000", ethernet_interface_name, ""])
def test_group_with_ip(mpi_group: MpiGroup, ifIpPortTrio: str):
if (ethernet_interface_name in ni.interfaces()) is False:
pytest.skip(f"{ethernet_interface_name} is not an interface to use on this node")
my_ip = ni.ifaddresses(ethernet_interface_name)[ni.AF_INET][0]["addr"]
root_ip = mpi_group.comm.bcast(my_ip, 0)
if ifIpPortTrio == ethernet_interface_name:
ifIpPortTrio += ":" + root_ip + ":50000" # some random port
if all_ranks_on_the_same_node(mpi_group) is False and "localhost" in ifIpPortTrio:
# ranks are on different nodes
pytest.skip("this case is not supported as localhost will be different for different nodes")
group = CommGroup(mpi_group.comm, ifIpPortTrio)
nelem = 1024
memory = np.zeros(nelem, dtype=np.int32)
nelemPerRank = nelem // group.nranks
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = np.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
for rank in range(group.nranks):
if rank == group.my_rank:
continue
group.send(memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))], rank, 0)
for rank in range(group.nranks):
if rank == group.my_rank:
continue
group.recv(memory[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))], rank, 0)
assert np.array_equal(memory, memory_expected)
@parametrize_mpi_groups(2, 4, 8, 16)
def test_bootstrap_init_gil_release(mpi_group: MpiGroup):
bootstrap = TcpBootstrap.create(mpi_group.comm.rank, mpi_group.comm.size)
uniq_id = None
if mpi_group.comm.rank == 0:
# similar to NCCL's unique id
uniq_id = bootstrap.create_unique_id()
uniq_id_global = mpi_group.comm.bcast(uniq_id, 0)
if mpi_group.comm.rank == 0:
# rank 0 never initializes the bootstrap, making other ranks block
pass
else:
check_list = []
def check_target():
check_list.append("this thread could run.")
def init_target():
try:
# expected to raise a timeout after 3 seconds
bootstrap.initialize(uniq_id_global, 3)
except:
pass
init_thread = threading.Thread(target=init_target)
check_thread = threading.Thread(target=check_target)
init_thread.start()
time.sleep(0.1)
# check that the check thread is not blocked
s = time.time()
check_thread.start()
check_thread.join()
e = time.time()
assert e - s < 0.1
assert len(check_list) == 1
init_thread.join()
mpi_group.comm.barrier()
def create_connection(group: CommGroup, connection_type: str):
if connection_type == "NVLS":
all_ranks = list(range(group.nranks))
tran = Transport.CudaIpc
connection = group.make_connection(all_ranks, tran, use_switch=True)
return connection
remote_nghrs = list(range(group.nranks))
remote_nghrs.remove(group.my_rank)
if connection_type == "NVLink":
tran = Transport.CudaIpc
elif connection_type == "IB":
tran = group.my_ib_device(group.my_rank % 8)
else:
assert False
connections = group.make_connection(remote_nghrs, tran)
return connections
def create_group_and_connection(mpi_group: MpiGroup, connection_type: str):
if (connection_type == "NVLink" or connection_type == "NVLS") and all_ranks_on_the_same_node(mpi_group) is False:
pytest.skip("cannot use nvlink/nvls for cross node")
if connection_type == "IB" and os.environ.get("MSCCLPP_DISABLE_IB_TESTS", "0") != "0":
pytest.skip("IB tests are disabled via MSCCLPP_DISABLE_IB_TESTS=1")
group = CommGroup(mpi_group.comm)
connection = create_connection(group, connection_type)
return group, connection
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("transport", ["IB", "NVLink"])
def test_group_with_connections(mpi_group: MpiGroup, transport: str):
create_group_and_connection(mpi_group, transport)
@parametrize_mpi_groups(1)
@pytest.mark.parametrize("nelem", [2**i for i in [0, 10, 15, 20]])
@pytest.mark.parametrize("dtype", [cp.float32, cp.float16])
def test_gpu_buffer(mpi_group: MpiGroup, nelem: int, dtype: cp.dtype):
memory = GpuBuffer(nelem, dtype=dtype)
assert memory.shape == (nelem,)
assert memory.dtype == dtype
assert memory.itemsize == cp.dtype(dtype).itemsize
assert memory.nbytes == nelem * cp.dtype(dtype).itemsize
assert memory.data.ptr != 0
assert memory.data.mem.ptr != 0
assert memory.data.mem.size >= nelem * cp.dtype(dtype).itemsize
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("connection_type", ["IB", "NVLink"])
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
def test_connection_write(mpi_group: MpiGroup, connection_type: str, nelem: int):
group, connections = create_group_and_connection(mpi_group, connection_type)
memory = GpuBuffer(nelem, dtype=cp.int32)
nelemPerRank = nelem // group.nranks
sizePerRank = nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
group.barrier()
all_reg_memories = group.register_tensor_with_connections(memory, connections)
for rank in connections:
connections[rank].write(
all_reg_memories[rank],
sizePerRank * group.my_rank,
all_reg_memories[group.my_rank],
sizePerRank * group.my_rank,
sizePerRank,
)
poll_for = 100
for i in range(poll_for):
all_correct = cp.array_equal(memory, memory_expected)
if all_correct:
break
time.sleep(0.1)
for conn in connections:
connections[conn].flush()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
assert all_correct
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("connection_type", ["IB", "NVLink"])
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20, 27]])
@pytest.mark.parametrize("device", ["cuda", "cpu"])
def test_connection_write_and_signal(mpi_group: MpiGroup, connection_type: str, nelem: int, device: str):
# this test starts with a random tensor on rank 0 and rotates it all the way through all ranks
# and finally, comes back to rank 0 to make sure it matches all the original values
if device == "cpu" and connection_type == "NVLink":
pytest.skip("nvlink doesn't work with host allocated memory")
group, connections = create_group_and_connection(mpi_group, connection_type)
xp = cp if device == "cuda" else np
if group.my_rank == 0:
memory = xp.random.randn(nelem)
memory = memory.astype(xp.float32)
memory_expected = memory.copy()
else:
memory = xp.zeros(nelem, dtype=xp.float32)
if device == "cuda":
cp.cuda.runtime.deviceSynchronize()
signal_memory = xp.zeros(1, dtype=xp.int64)
all_reg_memories = group.register_tensor_with_connections(memory, connections)
all_signal_memories = group.register_tensor_with_connections(signal_memory, connections)
next_rank = (group.my_rank + 1) % group.nranks
bufferSize = nelem * memory.itemsize
dummy_memory_on_cpu = np.zeros(1, dtype=np.int64)
signal_val = 123
if group.my_rank != 0:
while signal_memory[0] != signal_val:
time.sleep(0.1)
connections[next_rank].write(all_reg_memories[next_rank], 0, all_reg_memories[group.my_rank], 0, bufferSize)
connections[next_rank].flush()
if group.my_rank == 0:
memory[:] = 0
if device == "cuda":
cp.cuda.runtime.deviceSynchronize()
connections[next_rank].update_and_sync(
all_signal_memories[next_rank], 0, dummy_memory_on_cpu.ctypes.data, signal_val
)
all_correct = False
if group.my_rank == 0:
while signal_memory[0] != signal_val:
time.sleep(0.1)
all_correct = cp.array_equal(memory, memory_expected)
group.barrier()
all_correct = mpi_group.comm.bcast(all_correct, 0)
assert all_correct
@parametrize_mpi_groups(2, 4, 8, 16)
def test_h2h_semaphores(mpi_group: MpiGroup):
if os.environ.get("MSCCLPP_DISABLE_IB_TESTS", "0") != "0":
pytest.skip("IB tests are disabled via MSCCLPP_DISABLE_IB_TESTS=1")
group = CommGroup(mpi_group.comm)
tran = group.my_ib_device(group.my_rank % 8)
endpoint = EndpointConfig(tran, Device(DeviceType.CPU))
remote_nghrs = list(range(group.nranks))
remote_nghrs.remove(group.my_rank)
connections = {rank: group.communicator.connect(endpoint, rank) for rank in remote_nghrs}
connections = {rank: conn.get() for rank, conn in connections.items()}
semaphores = group.make_semaphores(connections)
semaphores = {rank: Host2HostSemaphore(sema) for rank, sema in semaphores.items()}
for rank in connections:
semaphores[rank].signal()
for rank in connections:
semaphores[rank].wait()
group.barrier()
@parametrize_mpi_groups(2, 4, 8, 16)
def test_h2h_semaphores_gil_release(mpi_group: MpiGroup):
if os.environ.get("MSCCLPP_DISABLE_IB_TESTS", "0") != "0":
pytest.skip("IB tests are disabled via MSCCLPP_DISABLE_IB_TESTS=1")
group = CommGroup(mpi_group.comm)
tran = group.my_ib_device(group.my_rank % 8)
endpoint = EndpointConfig(tran, Device(DeviceType.CPU))
remote_nghrs = list(range(group.nranks))
remote_nghrs.remove(group.my_rank)
connections = {rank: group.communicator.connect(endpoint, rank) for rank in remote_nghrs}
connections = {rank: conn.get() for rank, conn in connections.items()}
semaphores = group.make_semaphores(connections)
semaphores = {rank: Host2HostSemaphore(sema) for rank, sema in semaphores.items()}
def target_wait(sems, conns):
for rank in conns:
sems[rank].wait(-1)
def target_signal(sems, conns):
# sleep 1 sec to let target_wait() starts a bit earlier
time.sleep(1)
# if wait() doesn't release GIL, this will block forever
for rank in conns:
sems[rank].signal()
wait_thread = threading.Thread(target=target_wait, args=(semaphores, connections))
signal_thread = threading.Thread(target=target_signal, args=(semaphores, connections))
wait_thread.start()
signal_thread.start()
signal_thread.join()
wait_thread.join()
group.barrier()
@parametrize_mpi_groups(8)
@pytest.mark.skipif(is_nvls_supported() is False, reason="NVLS is not supported")
def test_nvls_connection(mpi_group: MpiGroup):
if all_ranks_on_the_same_node(mpi_group) is False:
pytest.skip("cannot use nvls for cross node")
group = CommGroup(mpi_group.comm)
all_ranks = list(range(group.nranks))
nvls_connection = group.make_connection(all_ranks, Transport.CudaIpc, use_switch=True)
memory1 = GpuBuffer(2**29, cp.int8)
memory2 = GpuBuffer(2**29, cp.int8)
memory3 = GpuBuffer(2**29, cp.int8)
mem_handle1 = nvls_connection.bind_allocated_memory(memory1.data.ptr, memory1.data.mem.size)
mem_handle2 = nvls_connection.bind_allocated_memory(memory2.data.ptr, memory2.data.mem.size)
with pytest.raises(Exception):
mem_handle3 = nvls_connection.bind_allocated_memory(memory3.data.ptr, memory3.data.mem.size)
# the memory is freed on the destructor of mem_handle2
mem_handle2 = None
mem_handle3 = nvls_connection.bind_allocated_memory(memory3.data.ptr, memory3.data.mem.size)
class MscclppKernel:
def __init__(
self,
test_name,
my_rank=None,
nranks=None,
semaphore_or_channels=None,
tensor=None,
use_packet=False,
scratch=None,
fifo=None,
nvls_mem_handle=None,
nvls_buffer_size=None,
):
file_dir = os.path.dirname(os.path.abspath(__file__))
if test_name == "h2d_semaphore":
self._kernel = KernelBuilder(
file="h2d_semaphore_test.cu", kernel_name="h2d_semaphore", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "d2d_semaphore":
self._kernel = KernelBuilder(
file="d2d_semaphore_test.cu", kernel_name="d2d_semaphore", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "memory_channel":
self._kernel = KernelBuilder(
file="memory_channel_test.cu", kernel_name="memory_channel", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = nranks
self.nthreads = 1024
elif test_name == "fifo":
self._kernel = KernelBuilder(
file="fifo_test.cu", kernel_name="fifo", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = 1
elif test_name == "proxy":
self._kernel = KernelBuilder(
file="proxy_test.cu", kernel_name="proxy", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "port_channel":
self._kernel = KernelBuilder(
file="port_channel_test.cu", kernel_name="port_channel", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = 1024
elif test_name == "nvls":
self._kernel = KernelBuilder(
file="nvls_test.cu", kernel_name="nvls_test", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 64
self.nthreads = 1024
else:
assert False
self.params = b""
if semaphore_or_channels != None:
first_arg = next(iter(semaphore_or_channels.values()))
size_of_semaphore_or_channels = len(first_arg.device_handle().raw)
device_handles = []
for rank in range(nranks):
if rank == my_rank:
device_handles.append(
bytes(size_of_semaphore_or_channels)
) # just zeros for semaphores that do not exist
else:
device_handles.append(semaphore_or_channels[rank].device_handle().raw)
# keep a reference to the device handles so that they don't get garbage collected
self._d_semaphore_or_channels = cp.asarray(memoryview(b"".join(device_handles)), dtype=cp.uint8)
if test_name in ["h2d_semaphore", "d2d_semaphore", "memory_channel", "port_channel"]:
self.params += pack(self._d_semaphore_or_channels, my_rank, nranks)
if test_name == "memory_channel":
self.params += pack(tensor.size, use_packet)
if test_name == "port_channel":
self.params += pack(tensor, scratch, tensor.size, use_packet)
elif test_name == "fifo":
self.params = fifo.device_handle().raw
elif test_name == "proxy":
self.params = pack(my_rank, nranks) + fifo.raw + pack(self._d_semaphore_or_channels)
elif test_name == "nvls":
self.params = (
nvls_mem_handle.device_handle().raw
+ pack(self._d_semaphore_or_channels)
+ pack(my_rank, nranks, nvls_buffer_size)
)
def __call__(self):
return self._kernel.launch_kernel(self.params, self.nblocks, self.nthreads, 0, None)
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("connection_type", ["NVLink", "IB"])
def test_h2d_semaphores(mpi_group: MpiGroup, connection_type: str):
def signal(semaphores):
for rank in semaphores:
semaphores[rank].signal()
group, connections = create_group_and_connection(mpi_group, connection_type)
semaphores = group.make_semaphores(connections)
semaphores = {rank: Host2DeviceSemaphore(sema) for rank, sema in semaphores.items()}
kernel = MscclppKernel("h2d_semaphore", group.my_rank, group.nranks, semaphores)
kernel()
# workaround: use a separate thread to to let cudaMemcpyAsync run concurrently with the kernel
with ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(signal, semaphores)
cp.cuda.runtime.deviceSynchronize()
group.barrier()
@parametrize_mpi_groups(2, 4, 8, 16)
def test_d2d_semaphores(mpi_group: MpiGroup):
group, connections = create_group_and_connection(mpi_group, "NVLink")
semaphores = group.make_semaphores(connections)
semaphores = {rank: MemoryDevice2DeviceSemaphore(sema) for rank, sema in semaphores.items()}
group.barrier()
kernel = MscclppKernel("d2d_semaphore", group.my_rank, group.nranks, semaphores)
kernel()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@pytest.mark.parametrize("use_packet", [False, True])
def test_memory_channels(mpi_group: MpiGroup, nelem: int, use_packet: bool):
group, connections = create_group_and_connection(mpi_group, "NVLink")
memory = GpuBuffer(nelem, dtype=cp.int32)
if use_packet:
scratch = GpuBuffer(nelem * 2, dtype=cp.int32)
else:
scratch = None
nelemPerRank = nelem // group.nranks
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
if use_packet:
registered_scratch_memory = group.register_local_memory(scratch, connections)
channels = group.make_memory_channels_with_scratch(memory, registered_scratch_memory, connections)
else:
channels = group.make_memory_channels(memory, connections)
kernel = MscclppKernel("memory_channel", group.my_rank, group.nranks, channels, memory, use_packet, scratch)
group.barrier()
kernel()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
assert cp.array_equal(memory, memory_expected)
@parametrize_mpi_groups(2, 4, 8, 16)
def test_fifo(
mpi_group: MpiGroup,
):
fifo = Fifo()
kernel = MscclppKernel("fifo", fifo=fifo)
kernel()
poll_for = 100
for _ in range(poll_for):
trigger = fifo.poll()
if trigger.fst == 123:
return
time.sleep(0.1)
assert False
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@pytest.mark.parametrize("connection_type", ["IB", "NVLink"])
def test_proxy(mpi_group: MpiGroup, nelem: int, connection_type: str):
group, connections = create_group_and_connection(mpi_group, connection_type)
memory = GpuBuffer(nelem, dtype=cp.int32)
nelemPerRank = nelem // group.nranks
nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
group.barrier()
all_reg_memories = group.register_tensor_with_connections(memory, connections)
semaphores = group.make_semaphores(connections)
list_sem = []
list_reg_mem = []
first_sem = next(iter(semaphores.values()))
for rank in range(group.nranks):
if rank in connections:
list_sem.append(semaphores[rank])
else:
list_sem.append(first_sem)
list_reg_mem.append(all_reg_memories[rank])
proxy = _ext.MyProxyService(group.my_rank, group.nranks, nelem * memory.itemsize, list_reg_mem, list_sem)
fifo_device_handle = proxy.fifo_device_handle()
kernel = MscclppKernel(
"proxy",
my_rank=group.my_rank,
nranks=group.nranks,
semaphore_or_channels={rank: Host2DeviceSemaphore(sema) for rank, sema in semaphores.items()},
fifo=fifo_device_handle,
)
proxy.start()
group.barrier()
kernel()
cp.cuda.runtime.deviceSynchronize()
proxy.stop()
group.barrier()
assert cp.array_equal(memory, memory_expected)
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@pytest.mark.parametrize("connection_type", ["NVLink", "IB"])
@pytest.mark.parametrize("use_packet", [False, True])
def test_port_channel(mpi_group: MpiGroup, nelem: int, connection_type: str, use_packet: bool):
group, connections = create_group_and_connection(mpi_group, connection_type)
memory = GpuBuffer(nelem, dtype=cp.int32)
if use_packet:
scratch = GpuBuffer(nelem * 2, dtype=cp.int32)
else:
scratch = GpuBuffer(1, dtype=cp.int32) # just so that we can pass a valid ptr
nelemPerRank = nelem // group.nranks
nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
group.barrier()
proxy_service = ProxyService()
if use_packet:
memory_to_register = scratch
else:
memory_to_register = memory
channels = group.make_port_channels(proxy_service, memory_to_register, connections)
kernel = MscclppKernel(
"port_channel",
my_rank=group.my_rank,
nranks=group.nranks,
semaphore_or_channels=channels,
tensor=memory,
use_packet=use_packet,
scratch=scratch,
)
proxy_service.start_proxy()
group.barrier()
kernel()
cp.cuda.runtime.deviceSynchronize()
proxy_service.stop_proxy()
group.barrier()
assert cp.array_equal(memory, memory_expected)
@parametrize_mpi_groups(4, 8)
@pytest.mark.skipif(is_nvls_supported() is False, reason="NVLS is not supported")
def test_nvls(mpi_group: MpiGroup):
group, nvls_connection = create_group_and_connection(mpi_group, "NVLS")
memory = GpuBuffer(2**21, dtype=cp.int8)
nbytes = 2**21
mem_handle = nvls_connection.bind_allocated_memory(memory.data.ptr, memory.data.mem.size)
nvlinks_connections = create_connection(group, "NVLink")
semaphores = group.make_semaphores(nvlinks_connections)
semaphores = {rank: MemoryDevice2DeviceSemaphore(sema) for rank, sema in semaphores.items()}
kernel = MscclppKernel(
"nvls",
my_rank=group.my_rank,
nranks=group.nranks,
nvls_mem_handle=mem_handle,
nvls_buffer_size=nbytes,
semaphore_or_channels=semaphores,
)
kernel()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
@parametrize_mpi_groups(2)
@pytest.mark.parametrize("filename", ["allreduce.json", "allreduce_packet.json"])
def test_executor(mpi_group: MpiGroup, filename: str):
if all_ranks_on_the_same_node(mpi_group) is False:
pytest.skip("algo not support cross node")
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
mscclpp_group = CommGroup(mpi_group.comm)
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = env().npkit_dump_dir
if npkit_dump_dir != "":
npkit.init(mscclpp_group.my_rank)
execution_plan = ExecutionPlan(
os.path.join(project_dir, "test", "execution-files", filename), mscclpp_group.my_rank
)
nelems = 1024 * 1024
cp.random.seed(42)
buffer = cp.random.random(nelems).astype(cp.float16)
sub_arrays = cp.split(buffer, mpi_group.comm.size)
nelems_per_rank = int(nelems / mpi_group.comm.size)
sendbuf = cp.empty(nelems_per_rank).astype(cp.float16)
for i in range(nelems_per_rank):
sendbuf[i] = sub_arrays[mpi_group.comm.rank][i]
expected = cp.zeros_like(sendbuf)
for i in range(mpi_group.comm.size):
expected += sub_arrays[i]
mscclpp_group.barrier()
stream = cp.cuda.Stream(non_blocking=True)
executor.execute(
mpi_group.comm.rank,
sendbuf.data.ptr,
sendbuf.data.ptr,
sendbuf.nbytes,
sendbuf.nbytes,
DataType.float16,
execution_plan,
stream.ptr,
)
stream.synchronize()
assert cp.allclose(sendbuf, expected, atol=1e-3 * mpi_group.comm.size)
if npkit_dump_dir is not None:
npkit.dump(npkit_dump_dir)
npkit.shutdown()