Files
mscclpp/python/test/test_mscclpp.py
2023-11-08 18:44:45 +00:00

451 lines
17 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from concurrent.futures import ThreadPoolExecutor
import time
import cupy as cp
import numpy as np
import netifaces as ni
import pytest
from mscclpp import Fifo, Host2DeviceSemaphore, Host2HostSemaphore, ProxyService, SmDevice2DeviceSemaphore, Transport, get_ib_device_count
from ._cpp import _ext
from .mscclpp_group import MscclppGroup
from .mscclpp_mpi import MpiGroup, parametrize_mpi_groups, mpi_group
from .utils import KernelBuilder, pack
ethernet_interface_name = "eth0"
skipif_ib = pytest.mark.skipif(get_ib_device_count() == 0, reason="no IB device")
def parametrize_transport(*transports: list):
def decorator(func):
params = []
for transport in transports:
if transport == "IB":
params.append(pytest.param(transport, marks=skipif_ib))
else:
params.append(transport)
return pytest.mark.parametrize("transport", params)(func)
return decorator
def all_ranks_on_the_same_node(mpi_group: MpiGroup):
if (ethernet_interface_name in ni.interfaces()) is False:
pytest.skip(f"{ethernet_interface_name} is not an interface to use on this node")
my_ip = ni.ifaddresses(ethernet_interface_name)[ni.AF_INET][0]["addr"]
root_ip = mpi_group.comm.bcast(my_ip, 0)
last_rank_ip = mpi_group.comm.bcast(my_ip, mpi_group.comm.size - 1)
return last_rank_ip == root_ip
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("ifIpPortTrio", ["eth0:localhost:50000", ethernet_interface_name, ""])
def test_group_with_ip(mpi_group: MpiGroup, ifIpPortTrio: str):
if (ethernet_interface_name in ni.interfaces()) is False:
pytest.skip(f"{ethernet_interface_name} is not an interface to use on this node")
my_ip = ni.ifaddresses(ethernet_interface_name)[ni.AF_INET][0]["addr"]
root_ip = mpi_group.comm.bcast(my_ip, 0)
if ifIpPortTrio == ethernet_interface_name:
ifIpPortTrio += ":" + root_ip + ":50000" # some random port
if all_ranks_on_the_same_node(mpi_group) is False and "localhost" in ifIpPortTrio:
# ranks are on different nodes
pytest.skip("this case is not supported as localhost will be different for different nodes")
group = MscclppGroup(mpi_group, ifIpPortTrio)
nelem = 1024
memory = np.zeros(nelem, dtype=np.int32)
nelemPerRank = nelem // group.nranks
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = np.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
for rank in range(group.nranks):
if rank == group.my_rank:
continue
group.send(memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))], rank, 0)
for rank in range(group.nranks):
if rank == group.my_rank:
continue
group.recv(memory[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))], rank, 0)
assert np.array_equal(memory, memory_expected)
def create_and_connect(mpi_group: MpiGroup, transport: str):
if transport == "NVLink" and all_ranks_on_the_same_node(mpi_group) is False:
pytest.skip("cannot use nvlink for cross node")
group = MscclppGroup(mpi_group)
remote_nghrs = list(range(mpi_group.comm.size))
remote_nghrs.remove(mpi_group.comm.rank)
if transport == "NVLink":
tran = Transport.CudaIpc
elif transport == "IB":
tran = group.my_ib_device(group.my_rank % 8)
else:
assert False
connections = group.make_connection(remote_nghrs, tran)
return group, connections
@parametrize_mpi_groups(2, 4, 8, 16)
@parametrize_transport("IB", "NVLink")
def test_group_with_connections(mpi_group: MpiGroup, transport: str):
create_and_connect(mpi_group, transport)
@parametrize_mpi_groups(2, 4, 8, 16)
@parametrize_transport("IB", "NVLink")
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
def test_connection_write(mpi_group: MpiGroup, transport: Transport, nelem: int):
group, connections = create_and_connect(mpi_group, transport)
memory = cp.zeros(nelem, dtype=cp.int32)
nelemPerRank = nelem // group.nranks
sizePerRank = nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
group.barrier()
all_reg_memories = group.register_tensor_with_connections(memory, connections)
for rank in connections:
connections[rank].write(
all_reg_memories[rank],
sizePerRank * group.my_rank,
all_reg_memories[group.my_rank],
sizePerRank * group.my_rank,
sizePerRank,
)
poll_for = 100
for i in range(poll_for):
all_correct = cp.array_equal(memory, memory_expected)
if all_correct:
break
time.sleep(0.1)
for conn in connections:
connections[conn].flush()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
assert all_correct
@parametrize_mpi_groups(2, 4, 8, 16)
@parametrize_transport("IB", "NVLink")
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20, 27]])
@pytest.mark.parametrize("device", ["cuda", "cpu"])
def test_connection_write_and_signal(mpi_group: MpiGroup, transport: Transport, nelem: int, device: str):
# this test starts with a random tensor on rank 0 and rotates it all the way through all ranks
# and finally, comes back to rank 0 to make sure it matches all the original values
if device == "cpu" and transport == "NVLink":
pytest.skip("nvlink doesn't work with host allocated memory")
group, connections = create_and_connect(mpi_group, transport)
xp = cp if device == "cuda" else np
if group.my_rank == 0:
memory = xp.random.randn(nelem)
memory = memory.astype(xp.float32)
memory_expected = memory.copy()
else:
memory = xp.zeros(nelem, dtype=xp.float32)
if device == "cuda":
cp.cuda.runtime.deviceSynchronize()
signal_memory = xp.zeros(1, dtype=xp.int64)
all_reg_memories = group.register_tensor_with_connections(memory, connections)
all_signal_memories = group.register_tensor_with_connections(signal_memory, connections)
next_rank = (group.my_rank + 1) % group.nranks
bufferSize = nelem * memory.itemsize
dummy_memory_on_cpu = np.zeros(1, dtype=np.int64)
signal_val = 123
if group.my_rank != 0:
while signal_memory[0] != signal_val:
time.sleep(0.1)
connections[next_rank].write(all_reg_memories[next_rank], 0, all_reg_memories[group.my_rank], 0, bufferSize)
connections[next_rank].flush()
if group.my_rank == 0:
memory[:] = 0
if device == "cuda":
cp.cuda.runtime.deviceSynchronize()
connections[next_rank].update_and_sync(
all_signal_memories[next_rank], 0, dummy_memory_on_cpu.ctypes.data, signal_val
)
all_correct = False
if group.my_rank == 0:
while signal_memory[0] != signal_val:
time.sleep(0.1)
all_correct = cp.array_equal(memory, memory_expected)
group.barrier()
all_correct = mpi_group.comm.bcast(all_correct, 0)
assert all_correct
@parametrize_mpi_groups(2, 4, 8, 16)
@skipif_ib
def test_h2h_semaphores(mpi_group: MpiGroup):
group, connections = create_and_connect(mpi_group, "IB")
semaphores = group.make_semaphore(connections, Host2HostSemaphore)
for rank in connections:
semaphores[rank].signal()
for rank in connections:
semaphores[rank].wait()
group.barrier()
class MscclppKernel:
def __init__(
self,
test_name,
my_rank=None,
nranks=None,
semaphore_or_channels=None,
tensor=None,
use_packet=False,
scratch=None,
fifo=None,
):
if test_name == "h2d_semaphore":
self._kernel = KernelBuilder(
file="h2d_semaphore_test.cu", kernel_name="h2d_semaphore"
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "d2d_semaphore":
self._kernel = KernelBuilder(
file="d2d_semaphore_test.cu", kernel_name="d2d_semaphore"
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "sm_channel":
self._kernel = KernelBuilder(file="sm_channel_test.cu", kernel_name="sm_channel").get_compiled_kernel()
self.nblocks = nranks
self.nthreads = 1024
elif test_name == "fifo":
self._kernel = KernelBuilder(file="fifo_test.cu", kernel_name="fifo").get_compiled_kernel()
self.nblocks = 1
self.nthreads = 1
elif test_name == "proxy":
self._kernel = KernelBuilder(file="proxy_test.cu", kernel_name="proxy").get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "simple_proxy_channel":
self._kernel = KernelBuilder(
file="simple_proxy_channel_test.cu", kernel_name="simple_proxy_channel"
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = 1024
else:
assert False
self.params = b""
if test_name in ["h2d_semaphore", "d2d_semaphore", "sm_channel", "simple_proxy_channel"]:
first_arg = next(iter(semaphore_or_channels.values()))
size_of_semaphore_or_channels = len(first_arg.device_handle().raw)
device_handles = []
for rank in range(nranks):
if rank == my_rank:
device_handles.append(
bytes(size_of_semaphore_or_channels)
) # just zeros for semaphores that do not exist
else:
device_handles.append(semaphore_or_channels[rank].device_handle().raw)
# keep a reference to the device handles so that they don't get garbage collected
self._d_semaphore_or_channels = cp.asarray(memoryview(b"".join(device_handles)), dtype=cp.uint8)
self.params += pack(self._d_semaphore_or_channels, my_rank, nranks)
if test_name == "sm_channel":
self.params += pack(tensor.size, use_packet)
if test_name == "simple_proxy_channel":
self.params += pack(tensor, scratch, tensor.size, use_packet)
elif test_name == "fifo":
self.params = fifo.device_handle().raw
elif test_name == "proxy":
semaphore_device_handles = [semaphore.device_handle().raw for semaphore in semaphore_or_channels]
self._d_semaphore_or_channels = cp.asarray(memoryview(b"".join(semaphore_device_handles)), dtype=cp.uint8)
self.params = pack(my_rank, nranks) + fifo.raw + pack(self._d_semaphore_or_channels)
def __call__(self):
return self._kernel.launch_kernel(self.params, self.nblocks, self.nthreads, 0, None)
@parametrize_mpi_groups(2, 4, 8, 16)
@parametrize_transport("NVLink", "IB")
def test_h2d_semaphores(mpi_group: MpiGroup, transport: str):
def signal(semaphores):
for rank in semaphores:
semaphores[rank].signal()
group, connections = create_and_connect(mpi_group, transport)
semaphores = group.make_semaphore(connections, Host2DeviceSemaphore)
kernel = MscclppKernel("h2d_semaphore", group.my_rank, group.nranks, semaphores)
kernel()
# workaround: use a separate thread to to let cudaMemcpyAsync run concurrently with the kernel
with ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(signal, semaphores)
cp.cuda.runtime.deviceSynchronize()
group.barrier()
@parametrize_mpi_groups(2, 4, 8, 16)
def test_d2d_semaphores(mpi_group: MpiGroup):
group, connections = create_and_connect(mpi_group, "NVLink")
semaphores = group.make_semaphore(connections, SmDevice2DeviceSemaphore)
group.barrier()
kernel = MscclppKernel("d2d_semaphore", group.my_rank, group.nranks, semaphores)
kernel()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@pytest.mark.parametrize("use_packet", [False, True])
def test_sm_channels(mpi_group: MpiGroup, nelem: int, use_packet: bool):
group, connections = create_and_connect(mpi_group, "NVLink")
memory = cp.zeros(nelem, dtype=cp.int32)
if use_packet:
scratch = cp.zeros(nelem * 2, dtype=cp.int32)
else:
scratch = None
nelemPerRank = nelem // group.nranks
nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
if use_packet:
channels = group.make_sm_channels_with_packet(memory, scratch, connections)
else:
channels = group.make_sm_channels(memory, connections)
kernel = MscclppKernel("sm_channel", group.my_rank, group.nranks, channels, memory, use_packet, scratch)
group.barrier()
kernel()
cp.cuda.runtime.deviceSynchronize()
group.barrier()
assert cp.array_equal(memory, memory_expected)
@parametrize_mpi_groups(2, 4, 8, 16)
def test_fifo(
mpi_group: MpiGroup,
):
fifo = Fifo()
kernel = MscclppKernel("fifo", fifo=fifo)
kernel()
poll_for = 100
for _ in range(poll_for):
trigger = fifo.poll()
if trigger.fst == 123:
return
time.sleep(0.1)
assert False
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@parametrize_transport("IB", "NVLink")
def test_proxy(mpi_group: MpiGroup, nelem: int, transport: str):
group, connections = create_and_connect(mpi_group, transport)
memory = cp.zeros(nelem, dtype=cp.int32)
nelemPerRank = nelem // group.nranks
nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
group.barrier()
all_reg_memories = group.register_tensor_with_connections(memory, connections)
semaphores = group.make_semaphore(connections, Host2DeviceSemaphore)
list_conn = []
list_sem = []
list_reg_mem = []
first_conn = next(iter(connections.values()))
first_sem = next(iter(semaphores.values()))
for rank in range(group.nranks):
if rank in connections:
list_conn.append(connections[rank])
list_sem.append(semaphores[rank])
else:
list_conn.append(first_conn) # just for simplicity of indexing
list_sem.append(first_sem)
list_reg_mem.append(all_reg_memories[rank])
proxy = _ext.MyProxyService(group.my_rank, group.nranks, nelem * memory.itemsize, list_conn, list_reg_mem, list_sem)
fifo_device_handle = proxy.fifo_device_handle()
kernel = MscclppKernel(
"proxy", my_rank=group.my_rank, nranks=group.nranks, semaphore_or_channels=list_sem, fifo=fifo_device_handle
)
proxy.start()
group.barrier()
kernel()
cp.cuda.runtime.deviceSynchronize()
proxy.stop()
group.barrier()
assert cp.array_equal(memory, memory_expected)
@parametrize_mpi_groups(2, 4, 8, 16)
@pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]])
@parametrize_transport("NVLink", "IB")
@pytest.mark.parametrize("use_packet", [False, True])
def test_simple_proxy_channel(mpi_group: MpiGroup, nelem: int, transport: str, use_packet: bool):
group, connections = create_and_connect(mpi_group, transport)
memory = cp.zeros(nelem, dtype=cp.int32)
if use_packet:
scratch = cp.zeros(nelem * 2, dtype=cp.int32)
else:
scratch = cp.zeros(1, dtype=cp.int32) # just so that we can pass a valid ptr
nelemPerRank = nelem // group.nranks
nelemPerRank * memory.itemsize
memory[(nelemPerRank * group.my_rank) : (nelemPerRank * (group.my_rank + 1))] = group.my_rank + 1
memory_expected = cp.zeros_like(memory)
for rank in range(group.nranks):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
group.barrier()
proxy_service = ProxyService()
if use_packet:
memory_to_register = scratch
else:
memory_to_register = memory
simple_channels = group.make_proxy_channels_with_packet(proxy_service, memory_to_register, connections)
kernel = MscclppKernel(
"simple_proxy_channel",
my_rank=group.my_rank,
nranks=group.nranks,
semaphore_or_channels=simple_channels,
tensor=memory,
use_packet=use_packet,
scratch=scratch,
)
proxy_service.start_proxy()
group.barrier()
kernel()
cp.cuda.runtime.deviceSynchronize()
proxy_service.stop_proxy()
group.barrier()
assert cp.array_equal(memory, memory_expected)