AlltoAll Test Support (#606)

Co-authored-by: Binyang Li <binyli@microsoft.com>
This commit is contained in:
Caio Rocha
2025-08-15 16:00:41 -07:00
committed by GitHub
parent 2b40fe37b3
commit 9261b1d278
13 changed files with 778 additions and 23 deletions

View File

@@ -217,3 +217,22 @@ class ReduceScatter(Collective):
}
rank_buffers.append(buffers)
return rank_buffers
class AllToAll(Collective):
def __init__(self, num_ranks, chunk_factor, inplace):
Collective.__init__(self, num_ranks, chunk_factor, inplace)
self.name = "alltoall"
def init_buffers(self):
rank_buffers = []
for rank in range(self.num_ranks):
input_buffer_size = self.num_ranks * self.chunk_factor
output_buffer_size = self.num_ranks * self.chunk_factor
buffers = {
BufferType.input: BaseBuffer(rank, BufferType.input, 0, input_buffer_size),
BufferType.output: BaseBuffer(rank, BufferType.output, 0, output_buffer_size),
}
rank_buffers.append(buffers)
return rank_buffers

View File

@@ -170,9 +170,9 @@ class CopyOperation(BaseOperation):
def shift_buffers(self, instance, num_instances, replication_function):
for chunk in self.src_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
for chunk in self.dst_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
def to_dict(self):
result = {"name": self.name.value}
@@ -448,9 +448,9 @@ class GetOperation(BaseOperation):
def shift_buffers(self, instance, num_instances, replication_function):
for chunk in self.src_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
for chunk in self.dst_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
def __add__(self, other):
fused_operation = None
@@ -529,9 +529,9 @@ class PutOperation(BaseOperation):
def shift_buffers(self, instance, num_instances, replication_function):
for chunk in self.src_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
for chunk in self.dst_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
def __add__(self, other):
fused_operation = None
@@ -635,13 +635,13 @@ class ReduceOperation(BaseOperation):
def shift_buffers(self, instance, num_instances, replication_function):
for chunk in self.local_src_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
for chunk in self.local_dst_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
for chunk in self.remote_src_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
for chunk in self.remote_dst_buff:
chunk.index = replication_function(chunk.index, instance, num_instances)
chunk.index = replication_function(chunk.index, chunk.size, instance, num_instances)
def __add__(self, other):
fused_operation = None
@@ -755,8 +755,8 @@ class GroupLoadReduce(BaseOperation):
self.reduce_operation = reduce_operation
def shift_buffers(self, instance, num_instances, replication_function):
self.buffer_offset = replication_function(self.buffer_offset, instance, num_instances)
self.dst_chunk.index = replication_function(self.dst_chunk.index, instance, num_instances)
self.buffer_offset = replication_function(self.buffer_offset, self.size, instance, num_instances)
self.dst_chunk.index = replication_function(self.dst_chunk.index, self.size, instance, num_instances)
def __add__(self, other):
fused_operation = None
@@ -812,8 +812,8 @@ class GroupStore(BaseOperation):
self.channel_type = channel_type
def shift_buffers(self, instance, num_instances, replication_function):
self.buffer_offset = replication_function(self.buffer_offset, instance, num_instances)
self.src_chunk.index = replication_function(self.src_chunk.index, instance, num_instances)
self.buffer_offset = replication_function(self.buffer_offset, self.size, instance, num_instances)
self.src_chunk.index = replication_function(self.src_chunk.index, self.size, instance, num_instances)
def to_dict(self):
result = {"name": self.name.value}
@@ -849,9 +849,9 @@ class GroupLoadReduceStore(BaseOperation):
def shift_buffers(self, instance, num_instances, replication_function):
for i in range(len(self.src_index)):
self.src_index[i] = replication_function(self.src_index[i], instance, num_instances)
self.src_index[i] = replication_function(self.src_index[i], self.size, instance, num_instances)
for i in range(len(self.dst_index)):
self.dst_index[i] = replication_function(self.dst_index[i], instance, num_instances)
self.dst_index[i] = replication_function(self.dst_index[i], self.size, instance, num_instances)
def to_dict(self):
result = {"name": self.name.value}
@@ -911,6 +911,10 @@ class PipelineOperation(BaseOperation):
for operation in self.operations:
operation.shift_buffers(instance, num_instances, replication_function)
def shift_ids(self, instance, num_instances, replication_function):
for operation in self.operations:
operation.shift_ids(instance, num_instances, replication_function)
def __add__(self, other):
fused_operation = None
if (self.get_data_sync() & SyncType.after) == SyncType.after and check_data_sync_op(other):

View File

@@ -166,7 +166,7 @@ class CollectiveProgram:
def get_buffer_replication_policy_function(self):
if self.replication_policy == ReplicationPolicy.interleaved:
return lambda value, instance, num_instances: value * num_instances + instance
return lambda value, size, instance, num_instances: value * num_instances + instance * size
else:
return lambda value, instance, num_instances: value

View File

@@ -0,0 +1,86 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *
def allgather_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
chunksperloop = 1
collective = AllGather(gpu_size, chunksperloop, True)
with MSCCLPPProgram(
name,
collective,
gpu_size,
protocol="Simple",
instances=32,
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=False,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
channels = {}
for gpu in range(gpu_size):
src_rank_id = gpu
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
# Initial Synchronization
for gpus in range(gpu_size):
src_rank_id = gpus
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id].signal(tb=0, relaxed=True)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id].wait(tb=0, data_sync=SyncType.after, relaxed=True)
# Perform AllGather
for src_rank_id in range(gpu_size):
src_rank = Rank(src_rank_id)
src_buffer = src_rank.get_output_buffer()
src_chunk = src_buffer[src_rank_id : src_rank_id + 1]
for peer in range(1, gpu_size):
dst_rank_id = (src_rank_id + peer) % gpu_size
dst_rank = Rank(dst_rank_id)
dst_input_buffer = dst_rank.get_output_buffer()
dst_chunk = dst_input_buffer[src_rank_id : src_rank_id + 1]
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id].put(dst_chunk, src_chunk, tb=0)
# Final Synchronization
for gpus in range(gpu_size):
src_rank_id = gpus
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id].signal(tb=0, relaxed=True)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id].wait(tb=0, relaxed=True)
print(JSON())
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_gpus", type=int, help="number of gpus")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
allgather_example(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)

View File

@@ -0,0 +1,98 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *
def alltoall_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
chunksperloop = 1
collective = AllToAll(gpu_size, chunksperloop, True)
with CollectiveProgram(
name,
collective,
gpu_size,
instances=2,
protocol="Simple",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=False,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
# Creating Channels and Scratch Buffer
channels = {}
scratch_buffer = {}
for gpu in range(gpu_size):
src_rank_id = gpu
scratch_buffer[src_rank_id] = Buffer(src_rank_id, gpu_size - 1)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
# Initial Synchronization
for gpus in range(gpu_size):
src_rank_id = gpus
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != peer:
tb = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
channels[dst_rank_id, src_rank_id].signal(tb=tb, relaxed=True)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
tb = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
channels[dst_rank_id, src_rank_id].wait(tb=tb, relaxed=True, data_sync=SyncType.after)
# Put Data in the Remote Rank
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
for peer in range(gpu_size):
dst_rank_id = peer
if dst_rank_id != src_rank_id:
remote_index = src_rank_id if src_rank_id < dst_rank_id else src_rank_id - 1
tb = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
channels[dst_rank_id, src_rank_id].put(
scratch_buffer[dst_rank_id][remote_index : remote_index + 1],
input_buffer[dst_rank_id : dst_rank_id + 1],
tb=tb,
)
channels[dst_rank_id, src_rank_id].signal(tb=tb, data_sync=SyncType.before)
# Copy Data From Scratch Buffer
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
for peer in range(gpu_size):
dst_rank_id = peer
if dst_rank_id != src_rank_id:
index = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
tb = index
channels[dst_rank_id, src_rank_id].wait(tb=tb, data_sync=SyncType.after)
src_rank.copy(
input_buffer[dst_rank_id : dst_rank_id + 1],
scratch_buffer[src_rank_id][index : index + 1],
tb=tb,
)
print(JSON())
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_gpus", type=int, help="number of gpus")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
alltoall_example(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)

View File

@@ -0,0 +1,108 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *
from mscclpp.language.loop import *
def alltoall_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
chunksperloop = 1
collective = AllToAll(gpu_size, chunksperloop, True)
with CollectiveProgram(
name,
collective,
gpu_size,
instances=2,
protocol="Simple",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=False,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
# Creating Channels and Scratch Buffer
channels = {}
sync_channels = {}
semaphores = {}
scratch_buffer = {}
tb_offset = gpu_size - 1
for gpu in range(gpu_size):
src_rank_id = gpu
scratch_buffer[src_rank_id] = Buffer(src_rank_id, gpu_size - 1)
for tb in range(gpu_size):
semaphores[src_rank_id, tb] = Semaphore(src_rank_id, initial_value=0)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
sync_channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
# Initial Synchronization
for gpus in range(gpu_size):
src_rank_id = gpus
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != peer:
tb = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
sync_channels[dst_rank_id, src_rank_id].signal(tb=tb, relaxed=True)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
tb = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
sync_channels[dst_rank_id, src_rank_id].wait(tb=tb, relaxed=True, data_sync=SyncType.after)
# Put Data in the Remote Rank
with LoopIterationContext(unit=2**19, num_chunks=1):
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
for peer in range(gpu_size):
dst_rank_id = peer
if dst_rank_id != src_rank_id:
remote_index = src_rank_id if src_rank_id < dst_rank_id else src_rank_id - 1
tb = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
channels[dst_rank_id, src_rank_id].put(
scratch_buffer[dst_rank_id][remote_index : remote_index + 1],
input_buffer[dst_rank_id : dst_rank_id + 1],
tb=tb,
)
channels[dst_rank_id, src_rank_id].signal(tb=tb, data_sync=SyncType.before)
semaphores[gpu, tb].release(tb=tb)
# Copy Data From Scratch Buffer
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
for peer in range(gpu_size):
dst_rank_id = peer
if dst_rank_id != src_rank_id:
index = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
tb = tb_offset + index
semaphores[gpu, tb - tb_offset].acquire(tb=tb)
channels[dst_rank_id, src_rank_id].wait(tb=tb, data_sync=SyncType.after)
src_rank.copy(
input_buffer[dst_rank_id : dst_rank_id + 1],
scratch_buffer[src_rank_id][index : index + 1],
tb=tb,
)
print(JSON())
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_gpus", type=int, help="number of gpus")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
alltoall_example(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)

View File

@@ -0,0 +1,85 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *
from mscclpp.language.loop import *
def alltoall_packet_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
chunksperloop = 1
collective = AllToAll(gpu_size, chunksperloop, True)
with CollectiveProgram(
name,
collective,
gpu_size,
instances=2,
protocol="LL",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=True,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
# Creating Channels and Scratch Buffer
channels = {}
scratch_buffer = {}
for gpu in range(gpu_size):
src_rank_id = gpu
scratch_buffer[src_rank_id] = Buffer(src_rank_id, gpu_size - 1)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
# Put Data in the Remote Rank
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
for peer in range(gpu_size):
dst_rank_id = peer
if dst_rank_id != src_rank_id:
remote_index = src_rank_id if src_rank_id < dst_rank_id else src_rank_id - 1
tb = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
channels[dst_rank_id, src_rank_id].put_packet(
scratch_buffer[dst_rank_id][remote_index : remote_index + 1],
input_buffer[dst_rank_id : dst_rank_id + 1],
tb=tb,
)
# Copy Data From Scratch Buffer
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
for peer in range(gpu_size):
dst_rank_id = peer
if dst_rank_id != src_rank_id:
index = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
tb = index
src_rank.unpack_packet(
input_buffer[dst_rank_id : dst_rank_id + 1],
scratch_buffer[src_rank_id][index : index + 1],
tb=tb,
)
print(JSON())
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_gpus", type=int, help="number of gpus")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
alltoall_packet_example(
args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size
)

View File

@@ -0,0 +1,101 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *
from mscclpp.language.loop import *
def alltoall_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
chunksperloop = 1
collective = AllToAll(gpu_size, chunksperloop, True)
with CollectiveProgram(
name,
collective,
gpu_size,
instances=16,
protocol="Simple",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=False,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
# Creating Channels and Scratch Buffer
channels = {}
sync_channels = {}
semaphores = {}
scratch_buffer = {}
for gpu in range(gpu_size):
src_rank_id = gpu
scratch_buffer[src_rank_id] = Buffer(src_rank_id, gpu_size - 1)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
sync_channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
semaphores[src_rank_id, dst_rank_id] = Semaphore(src_rank_id, initial_value=0)
# Initial Synchronization
for gpus in range(gpu_size):
src_rank_id = gpus
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != peer:
sync_channels[dst_rank_id, src_rank_id].signal(tb=0, relaxed=True)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
sync_channels[dst_rank_id, src_rank_id].wait(tb=0, relaxed=True, data_sync=SyncType.after)
# Put Data in the Remote Rank
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
for peer in range(1, gpu_size):
dst_rank_id = (src_rank_id + peer) % gpu_size
if dst_rank_id != src_rank_id:
remote_index = src_rank_id if src_rank_id < dst_rank_id else src_rank_id - 1
channels[dst_rank_id, src_rank_id].put(
scratch_buffer[dst_rank_id][remote_index : remote_index + 1],
input_buffer[dst_rank_id : dst_rank_id + 1],
tb=0,
)
channels[dst_rank_id, src_rank_id].signal(tb=0, data_sync=SyncType.before)
semaphores[src_rank_id, dst_rank_id].release(tb=0)
# Copy Data From Scratch Buffer
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
for peer in range(1, gpu_size):
dst_rank_id = (src_rank_id - peer) % gpu_size
if dst_rank_id != src_rank_id:
index = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
semaphores[src_rank_id, dst_rank_id].acquire(tb=1)
channels[dst_rank_id, src_rank_id].wait(tb=1, data_sync=SyncType.after)
src_rank.copy(
input_buffer[dst_rank_id : dst_rank_id + 1],
scratch_buffer[src_rank_id][index : index + 1],
tb=1,
)
print(JSON())
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_gpus", type=int, help="number of gpus")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
alltoall_example(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)

View File

@@ -0,0 +1,104 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *
from mscclpp.language.loop import *
def alltoall_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
chunksperloop = 1
collective = AllToAll(gpu_size, chunksperloop, False)
with CollectiveProgram(
name,
collective,
gpu_size,
instances=16,
protocol="Simple",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=False,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
# Creating Channels and Scratch Buffer
channels = {}
sync_channels = {}
scratch_buffer = {}
for gpu in range(gpu_size):
src_rank_id = gpu
scratch_buffer[src_rank_id] = Buffer(src_rank_id, gpu_size - 1)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
sync_channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
# Initial Synchronization
for gpus in range(gpu_size):
src_rank_id = gpus
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != peer:
sync_channels[dst_rank_id, src_rank_id].signal(relaxed=True, tb=0)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
sync_channels[dst_rank_id, src_rank_id].wait(tb=0, relaxed=True, data_sync=SyncType.after)
# Put Data in the Remote Rank
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
for peer in range(1, gpu_size):
dst_rank_id = (src_rank_id + peer) % gpu_size
if dst_rank_id != src_rank_id:
remote_index = src_rank_id if src_rank_id < dst_rank_id else src_rank_id - 1
channels[dst_rank_id, src_rank_id].put(
scratch_buffer[dst_rank_id][remote_index : remote_index + 1],
input_buffer[dst_rank_id : dst_rank_id + 1],
tb=0,
)
channels[dst_rank_id, src_rank_id].signal(tb=0, data_sync=SyncType.before)
# Copy Data From Scratch Buffer
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
output_buffer = src_rank.get_output_buffer()
for peer in range(0, gpu_size):
dst_rank_id = (src_rank_id - peer) % gpu_size
if dst_rank_id != src_rank_id:
index = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
channels[dst_rank_id, src_rank_id].wait(tb=1, data_sync=SyncType.after)
src_rank.copy(
output_buffer[dst_rank_id : dst_rank_id + 1],
scratch_buffer[src_rank_id][index : index + 1],
tb=1,
)
else:
src_rank.copy(
output_buffer[dst_rank_id : dst_rank_id + 1],
input_buffer[dst_rank_id : dst_rank_id + 1],
tb=0,
)
print(JSON())
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_gpus", type=int, help="number of gpus")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
alltoall_example(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)

View File

@@ -0,0 +1,122 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *
from mscclpp.language.loop import *
def find_pairs(size):
partner = [{j for j in range(size) if j != i} for i in range(size)]
step = []
for _ in range(size - 1):
used = set()
matches = {}
for x in range(size):
if x not in used:
for y in range(x + 1, size):
if y not in used and y in partner[x] and x in partner[y]:
matches[x] = y
matches[y] = x
used.add(x)
used.add(y)
partner[x].remove(y)
partner[y].remove(x)
break
step.append(matches)
return step
def alltoall_example(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
chunksperloop = 1
collective = AllToAll(gpu_size, chunksperloop, True)
with CollectiveProgram(
name,
collective,
gpu_size,
instances=16,
protocol="Simple",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=False,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
# Creating Channels and Scratch Buffer
channels = {}
sync_channels = {}
semaphores = {}
scratch_buffer = {}
step_paris = find_pairs(gpu_size)
for gpu in range(gpu_size):
src_rank_id = gpu
scratch_buffer[src_rank_id] = Buffer(src_rank_id, gpu_size - 1)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
sync_channels[dst_rank_id, src_rank_id] = MemoryChannel(dst_rank_id, src_rank_id)
semaphores[src_rank_id, dst_rank_id] = Semaphore(src_rank_id, initial_value=0)
# Initial Synchronization
for gpus in range(gpu_size):
src_rank_id = gpus
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != peer:
sync_channels[dst_rank_id, src_rank_id].signal(tb=0, relaxed=True)
for peer in range(gpu_size):
dst_rank_id = peer
if src_rank_id != dst_rank_id:
sync_channels[dst_rank_id, src_rank_id].wait(tb=0, relaxed=True, data_sync=SyncType.after)
# Put Data in the Remote Rank
for step in range(gpu_size - 1):
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
dst_rank_id = step_paris[step][src_rank_id]
remote_index = src_rank_id if src_rank_id < dst_rank_id else src_rank_id - 1
channels[dst_rank_id, src_rank_id].put(
scratch_buffer[dst_rank_id][remote_index : remote_index + 1],
input_buffer[dst_rank_id : dst_rank_id + 1],
tb=0,
)
channels[dst_rank_id, src_rank_id].signal(tb=0, data_sync=SyncType.before)
semaphores[src_rank_id, dst_rank_id].release(tb=0)
# Copy Data From Scratch Buffer
for gpu in range(gpu_size):
src_rank_id = gpu
src_rank = Rank(src_rank_id)
input_buffer = src_rank.get_input_buffer()
dst_rank_id = step_paris[step][src_rank_id]
index = dst_rank_id if dst_rank_id < src_rank_id else dst_rank_id - 1
semaphores[src_rank_id, dst_rank_id].acquire(tb=1)
channels[dst_rank_id, src_rank_id].wait(tb=1, data_sync=SyncType.after)
src_rank.copy(
input_buffer[dst_rank_id : dst_rank_id + 1], scratch_buffer[src_rank_id][index : index + 1], tb=1
)
print(JSON())
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_gpus", type=int, help="number of gpus")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
alltoall_example(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)

View File

@@ -75,8 +75,10 @@ def bench_correctness(
coll = "all_gather"
elif "reducescatter" in collective:
coll = "reduce_scatter"
else:
elif "allreduce" in collective:
coll = "all_reduce"
else:
coll = "all_to_all"
test_data_kernel_name = "test_data_%s_%s" % (coll, dtype_str)
file_dir = os.path.dirname(os.path.abspath(__file__))

View File

@@ -98,4 +98,26 @@ TEST_DATA_ALL_REDUCE(int32, int)
TEST_DATA_REDUCE_SCATTER(float16, __half)
TEST_DATA_REDUCE_SCATTER(float32, float)
TEST_DATA_REDUCE_SCATTER(int32, int)
TEST_DATA_REDUCE_SCATTER(int32, int)
#define TEST_DATA_ALL_TO_ALL(FuncNameType, DataType) \
extern "C" __global__ void __launch_bounds__(1024, 1) test_data_all_to_all_##FuncNameType( \
DataType* result_buf, DataType* test_buf, size_t num_elems, int num_ranks, int my_rank, int seq) { \
int nem_elems_per_rank = num_elems / num_ranks; \
int offset = nem_elems_per_rank * my_rank; \
for (int rank = 0; rank < num_ranks; rank++) { \
size_t rank_offset = rank * nem_elems_per_rank; \
unsigned int seed = (unsigned int)(blockIdx.x * blockDim.x + threadIdx.x + rank + seq); \
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < num_elems; i += blockDim.x * gridDim.x) { \
seed = ranqd1(seed); \
if (i >= my_rank * nem_elems_per_rank && i < (my_rank + 1) * nem_elems_per_rank) { \
test_buf[rank_offset + i - offset] = DataType(seed % blockDim.x) / DataType(blockDim.x); \
assert(result_buf[rank_offset + i - offset] == test_buf[rank_offset + i - offset]); \
} \
} \
} \
}
TEST_DATA_ALL_TO_ALL(float16, __half)
TEST_DATA_ALL_TO_ALL(float32, float)
TEST_DATA_ALL_TO_ALL(int32, int)