mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-25 15:24:43 +00:00
## Summary This PR addresses a multicast resource leak, fixes `cuMemMap` offset handling for multicast handles, renames NVLS allreduce algorithm classes for clarity, and adds a new unit test for `SwitchChannel`. ### Bug Fixes #### 1. Fix multicast allocation handle leak in `createMulticast()` (`gpu_ipc_mem.cc`) `GpuIpcMemHandle::createMulticast()` called `cuMulticastCreate(&allocHandle, ...)` but never released the local `allocHandle` after exporting it to shareable handles (POSIX FD / Fabric). This caused a reference count leak — the multicast object was never freed even after all mappings and imported handles were released. Per the [CUDA Driver API docs for `cuMemRelease`](https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__VA.html): > *"The memory allocation will be freed when all outstanding mappings to the memory are unmapped and when all outstanding references to the handle (including its shareable counterparts) are also released."* The fix adds `cuMemRelease(allocHandle)` after export, matching the existing pattern used for regular allocations in `GpuIpcMemHandle::create()`. **Impact:** Without this fix, repeated creation/destruction of NVLS connections causes OOM after ~120 iterations when allocating 1GB multicast buffers on H100. #### 2. Fix `cuMemMap` offset for multicast handles (`gpu_ipc_mem.cc`) `cuMemMap` requires `offset=0` for multicast handles. Previously, the code attempted to map at a non-zero offset within the multicast object, leading to errors when binding multiple buffers to the same `NvlsConnection`. The fix maps the entire range `[0, mcOffset + bufferSize)` and returns the pointer offset by `mcOffset`. This only consumes extra virtual address space; no additional physical memory is used. ### Refactoring #### 3. Rename NVLS allreduce algorithm classes Renamed for clarity: - `AllreduceNvls` → `AllreduceNvlsZeroCopy` - `AllreduceNvlsWithCopy` → `AllreduceNvlsWarpPipeline` - `AllreduceNvlsWithCopy2` → `AllreduceNvlsBlockPipeline` Updated all references in builder, selector, docs, and examples. #### 4. Move `nvlsConnections` setup to `initialize()` Moved `nvlsConnections_` from `AlgorithmCtx` (which no longer has this member) to individual algorithm class members, initialized in their `initialize()` methods. ### Tests #### 5. Add `TwoChannelsSameConnection` test New unit test that creates two `SwitchChannel` instances from the same `NvlsConnection`, performs reduce operations on both, and verifies correctness. This exercises the multi-bind path that triggered the `cuMemMap` offset fix. ### Files Changed - `src/core/gpu_ipc_mem.cc` — multicast handle leak fix + cuMemMap offset fix - `src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu` (renamed) - `src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu` (renamed) - `src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu` (renamed) - `src/ext/collectives/allreduce/allreduce_nvls_packet.cu` — nvlsConnections fix - `src/ext/collectives/include/allreduce/*.hpp` — renamed headers - `src/ext/collectives/algorithm_collection_builder.cc` — updated references - `src/ext/nccl/algorithm_selector.cc` — updated algorithm names - `test/mp_unit/switch_channel_tests.cu` — new test - `docs/guide/mscclpp-torch-integration.md` — updated names - `examples/torch-integration/customized_comm_with_default_algo.py` — updated names
123 lines
4.5 KiB
Python
123 lines
4.5 KiB
Python
# Copyright (c) Microsoft Corporation.
|
|
# Licensed under the MIT License.
|
|
|
|
# MSCCLPP_MASTER_ADDR=<master_ip> MSCCLPP_MASTER_PORT=<port> torchrun --nnodes=1 --nproc_per_node=8 customized_comm_with_default_algo.py
|
|
|
|
import os
|
|
import torch
|
|
import mscclpp.utils as mscclpp_utils
|
|
import mscclpp
|
|
import mscclpp.ext
|
|
import netifaces as ni
|
|
import ipaddress
|
|
|
|
|
|
def load_algorithms(scratch_buffer: torch.tensor, rank: int) -> mscclpp.AlgorithmCollection:
|
|
collection_builder = mscclpp.ext.AlgorithmCollectionBuilder()
|
|
return collection_builder.build_default_algorithms(
|
|
scratch_buffer=scratch_buffer.data_ptr(),
|
|
scratch_buffer_size=scratch_buffer.nbytes,
|
|
rank=rank,
|
|
)
|
|
|
|
|
|
def interfaces_for_ip_netifaces(ip: str):
|
|
target = ipaddress.ip_address(ip)
|
|
for interface in ni.interfaces():
|
|
addresses = ni.ifaddresses(interface)
|
|
if ni.AF_INET in addresses:
|
|
for link in addresses[ni.AF_INET]:
|
|
if "addr" in link:
|
|
addr = ipaddress.ip_address(link["addr"])
|
|
if addr == target:
|
|
return interface
|
|
return None
|
|
|
|
|
|
def to_mscclpp_reduce_op(op: torch.distributed.ReduceOp) -> mscclpp.ReduceOp:
|
|
if op == torch.distributed.ReduceOp.SUM:
|
|
return mscclpp.ReduceOp.SUM
|
|
elif op == torch.distributed.ReduceOp.MIN:
|
|
return mscclpp.ReduceOp.MIN
|
|
else:
|
|
raise ValueError(f"unsupported op: {op}")
|
|
|
|
|
|
class CustomizedComm:
|
|
def __init__(self, comm: mscclpp.CommGroup):
|
|
self.comm = comm
|
|
self.rank = comm.my_rank
|
|
self.world_size = comm.nranks
|
|
self.local_rank = comm.my_rank % comm.nranks_per_node
|
|
self.n_ranks_per_node = comm.nranks_per_node
|
|
dlpack = mscclpp.RawGpuBuffer(1 << 27).to_dlpack(data_type=str(torch.float16))
|
|
self.scratch_buffer = torch.utils.dlpack.from_dlpack(dlpack)
|
|
algorithms = load_algorithms(scratch_buffer=self.scratch_buffer, rank=self.rank)
|
|
self._algorithm_nvls_packet = [
|
|
algo
|
|
for algo in algorithms
|
|
if algo.collective == "allreduce" and algo.name == "default_allreduce_nvls_packet"
|
|
][0]
|
|
self._algorithm_nvls_nonzero_copy = [
|
|
algo
|
|
for algo in algorithms
|
|
if algo.collective == "allreduce" and algo.name == "default_allreduce_nvls_warp_pipeline"
|
|
][0]
|
|
|
|
def all_reduce(self, tensor: torch.Tensor, op=torch.distributed.ReduceOp.SUM, stream: torch.cuda.Stream = None):
|
|
assert op == torch.distributed.ReduceOp.SUM
|
|
algo = None
|
|
if tensor.nbytes < 1 << 20:
|
|
algo = self._algorithm_nvls_packet
|
|
else:
|
|
algo = self._algorithm_nvls_nonzero_copy
|
|
algo.execute(
|
|
comm=self.comm.communicator,
|
|
input_buffer=tensor.data_ptr(),
|
|
output_buffer=tensor.data_ptr(),
|
|
input_size=tensor.nbytes,
|
|
output_size=tensor.nbytes,
|
|
dtype=mscclpp_utils.torch_dtype_to_mscclpp_dtype(tensor.dtype),
|
|
op=to_mscclpp_reduce_op(op),
|
|
stream=stream.cuda_stream if stream is not None else 0,
|
|
)
|
|
|
|
def barrier(self):
|
|
tensor = torch.empty(1, dtype=torch.float, device=torch.device("cuda"))
|
|
self.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM, stream=torch.cuda.current_stream())
|
|
|
|
def destroy(self):
|
|
self._algorithm_nvls_nonzero_copy = None
|
|
self._algorithm_nvls_packet = None
|
|
self.scratch_buffer = None
|
|
self.comm = None
|
|
|
|
|
|
def init_dist() -> CustomizedComm:
|
|
rank = int(os.environ["RANK"])
|
|
world = int(os.environ["WORLD_SIZE"])
|
|
master_addr = os.environ["MSCCLPP_MASTER_ADDR"]
|
|
master_port = os.environ["MSCCLPP_MASTER_PORT"]
|
|
interface = interfaces_for_ip_netifaces(master_addr)
|
|
if interface is None:
|
|
raise ValueError(f"Cannot find network interface for IP address {master_addr}")
|
|
interfaceIpPortTrio = f"{interface}:{master_addr}:{master_port}"
|
|
mscclpp_group = mscclpp.CommGroup(interfaceIpPortTrio=interfaceIpPortTrio, rank=rank, size=world)
|
|
return CustomizedComm(mscclpp_group)
|
|
|
|
|
|
def main():
|
|
local = int(os.environ["LOCAL_RANK"])
|
|
torch.cuda.set_device(local)
|
|
comm = init_dist()
|
|
comm.barrier()
|
|
input_data = torch.randn(1 << 22, dtype=torch.float16, device=torch.device("cuda"))
|
|
comm.all_reduce(input_data, op=torch.distributed.ReduceOp.SUM, stream=torch.cuda.current_stream())
|
|
comm.barrier()
|
|
comm.destroy()
|
|
print(f"rank {local} All-reduce operation completed successfully.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|