Supporting New Packet Kernel Operation at Executor (#677)

This PR introduces three new operations to enhance flexibility and
performance at executor.

One operation can be invoked directly via the DSL API and two operations
are created through fusion of existing operations, reducing overhead and
improving efficiency.

1. Port Channel Put Packet (Direct DSL API Call): Sends data from pkt
format to the remote side in pkt format via the port channel. Both
source and destination buffers must be scratch.

2. Reduce Copy Packet (Fusion):
Reduce Packet+Copy Packet=Reduce Copy Packet
Triggered when the destination buffer of Reduce Packet matches the
source buffer of Copy Packet.
Purpose: Combine reduction and copy into a single step for better
performance.

3. Reduce Copy Send Packet (Fusion):
Reduce Copy Packet+Put Packet=Reduce Copy Send Packet (when dst buffer
of Reduce Copy Packet matches src buffer of Put Packet)
Reduce Copy Packet+Read Put Packet=Reduce Copy Send Packet (when dst pkt
buffer of Reduce Copy Packet matches src buffer of Read Put Packet)
Purpose: Combine reduction, copy, and send operations into one optimized
pipeline.


Fusion Diagram
Reduce Packet + Copy Packet → Reduce Copy Packet
Reduce Copy Packet + Put Packet → Reduce Copy Send Packet
Reduce Copy Packet + Read Put Packet → Reduce Copy Send Packet

Beyond this, this PR adjust the AllReduce 2 Node algorithm:

Message Size  |  Latency (µs)
        1K            |     15.34
        2K            |     15.88
        4K            |     15.71
        8K            |     16.01
        16K          |     15.88
        32K          |     16.21
        64K          |     16.90
        128K        |     18.24
        256K        |     20.39
        512K        |     25.26
        1M           |     32.74
        2M           |     53.64
This commit is contained in:
Caio Rocha
2025-11-13 14:08:44 -08:00
committed by GitHub
parent eb202780f5
commit 7eb3ff701a
12 changed files with 376 additions and 31 deletions

View File

@@ -12,10 +12,10 @@ from mscclpp.language.utils import AlgoSpec
default_algo_configs = [
{
"filename": "allreduce_2nodes.json",
"filename": "allreduce_2nodes_1K_64K.json",
"function": def_algo.allreduce_2nodes,
"spec": AlgoSpec(
name="allreduce_2nodes",
name="allreduce_2nodes_1K_64K",
collective=AllReduce(16, 1, True),
nranks_per_node=8,
world_size=16,
@@ -27,11 +27,32 @@ default_algo_configs = [
reuse_resources=True,
use_double_scratch_buffer=True,
min_message_size=1 << 10,
max_message_size=64 << 10,
tags={"default": 1},
),
"additional_kwargs": {"thread_block_group_size": 1},
},
{
"filename": "allreduce_2nodes_128K_2M.json",
"function": def_algo.allreduce_2nodes,
"spec": AlgoSpec(
name="allreduce_2nodes_128K_2M",
collective=AllReduce(16, 1, True),
nranks_per_node=8,
world_size=16,
in_place=True,
instances=1,
protocol="LL",
auto_sync=False,
num_threads_per_block=1024,
reuse_resources=True,
use_double_scratch_buffer=True,
min_message_size=128 << 10,
max_message_size=2 << 20,
tags={"default": 1},
),
"additional_args": [4],
}
"additional_kwargs": {"thread_block_group_size": 4},
},
]
@@ -46,12 +67,12 @@ def create_default_plans():
filename = config["filename"]
func = config["function"]
spec = config["spec"]
additional_args = config.get("additional_args", [])
additional_kwargs = config.get("additional_kwargs", {})
plan_path = os.path.join(plan_dir, filename)
try:
if additional_args:
prog = func(spec, *additional_args)
if additional_kwargs:
prog = func(spec, **additional_kwargs)
else:
prog = func(spec)

View File

@@ -682,6 +682,55 @@ class PortChannel:
get_program().add_operation(self.src_rank, tb, op)
def put_packets(self, dst_chunk: Chunk, src_chunk: Chunk, tb: int):
"""Transfer data from local buffer to remote scratch buffer in packet format.
Performs a specialized put operation that transfers data from the source rank's buffer
to the destination rank's scratch buffer in packet format through the port channel.
The destination chunk must be a scratch buffer.
Args:
dst_chunk (Chunk): The destination scratch chunk on the destination rank.
src_chunk (Chunk): The source chunk on the source rank (any buffer type).
tb (int): The thread block ID that will execute this operation.
Raises:
RuntimeError: If chunk ranks don't match channel configuration, if destination
chunk is not a scratch buffer, or if chunk sizes don't match.
Example:
>>> channel.put_packets(dst_chunk, src_chunk, tb=0)
"""
if src_chunk.rank != self.src_rank:
raise RuntimeError(
f"Source chunk rank {src_chunk.rank} does not match current channel source rank {self.src_rank}."
)
if dst_chunk.rank != self.dst_rank:
raise RuntimeError(
f"Dst chunk rank {dst_chunk.rank} does not match current channel dst rank {self.dst_rank}."
)
if dst_chunk.buffer != BufferType.scratch:
raise RuntimeError(f"Destination chunk must be of type scratch.")
if dst_chunk.size != src_chunk.size:
raise RuntimeError(
f"Destination chunk size {dst_chunk.size} does not match source chunk size {src_chunk.size}."
)
remote_chunk = RemoteBuffer(src_chunk.rank, dst_chunk.rank, dst_chunk.buffer, self.channel_type)
tb_chunk_id = get_program().setup_remote_chunk(self.src_rank, tb, remote_chunk, self.channel_type)
tb_channel_ids = get_program().setup_channel(tb, self)
op = PutOperation(
src_buff=[LocalChunk(src_chunk.buffer, src_chunk.index, src_chunk.size)],
dst_buff=[RemoteChunk(dst_chunk.buffer, dst_chunk.index, dst_chunk.size, tb_chunk_id)],
channel_ids=tb_channel_ids,
channel_type=self.channel_type,
from_packet=False,
to_packet=True,
)
get_program().add_operation(self.src_rank, tb, op)
def read_put_packets(self, dst_chunk: Chunk, src_chunk: Chunk, tb: int):
"""Transfer data in packet format from local to remote scratch buffer.

View File

@@ -34,15 +34,31 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
inter_node_port_channels = {}
scratch_buffers = []
thread_block_offset = 1
thread_block_group = ThreadBlockGroup(
tb_list=[i for i in range(thread_block_offset, thread_block_offset + thread_block_group_size)]
thread_block_groups = []
global_intra_node_tbg = ThreadBlockGroup(
tb_list=[
i
for i in range(thread_block_offset, thread_block_offset + (gpus_per_node - 1) * thread_block_group_size)
]
)
for i in range(gpus_per_node - 1):
thread_block_groups.append(
ThreadBlockGroup(
tb_list=[
i
for i in range(
thread_block_offset + i * thread_block_group_size,
thread_block_offset + (i + 1) * thread_block_group_size,
)
]
)
)
scratch_buffer_size = packets_per_gpu * (total_gpus + 1)
for node_id in range(num_nodes):
for local_gpu_id in range(gpus_per_node):
current_rank_id = local_gpu_id + gpus_per_node * node_id
next_node_rank_id = (local_gpu_id + gpus_per_node * (node_id + 1)) % total_gpus
scratch_buffer_size = 2 * total_gpus
scratch_buffers.append(Buffer(current_rank_id, scratch_buffer_size))
for peer_gpu_id in range(gpus_per_node):
if peer_gpu_id != local_gpu_id:
@@ -64,13 +80,14 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
for peer_gpu_id in range(gpus_per_node):
peer_rank_id = peer_gpu_id + gpus_per_node * node_id
peer_data_offset = peer_gpu_id * packets_per_gpu
tbg_id = peer_gpu_id if peer_gpu_id < local_gpu_id else peer_gpu_id - 1
if peer_gpu_id != local_gpu_id:
intra_node_memory_channels[(peer_rank_id, current_rank_id)].put_packets(
scratch_buffers[peer_rank_id][
local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu
],
input_buffer[peer_data_offset : peer_data_offset + packets_per_gpu],
tb_group=thread_block_group,
tb_group=thread_block_groups[tbg_id],
)
# Intra Node Reduce
@@ -84,20 +101,24 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
current_rank.reduce(
input_buffer[local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu],
other_gpu_data,
tb_group=thread_block_group,
tb_group=global_intra_node_tbg,
packet=True,
)
# Copy Reduced Data to Scratch Buffer and send to Next Node
current_rank.copy_packets(
scratch_buffers[current_rank_id][
local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu
],
input_buffer[local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu],
tb_group=thread_block_group,
tb_group=global_intra_node_tbg,
)
current_rank.barrier(
tb_list=[i for i in range(thread_block_offset + (gpus_per_node - 1) * thread_block_group_size)]
)
inter_node_offset = total_gpus
inter_node_port_channels[current_rank_id].read_put_packets(
inter_node_port_channels[current_rank_id].put_packets(
scratch_buffers[next_node_rank_id][
inter_node_offset
+ local_gpu_id * packets_per_gpu : inter_node_offset
@@ -122,31 +143,39 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
current_rank.reduce(
input_buffer[local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu],
inter_node_data,
tb_group=thread_block_group,
tb_group=global_intra_node_tbg,
packet=True,
)
current_rank.copy_packets(
scratch_buffers[current_rank_id][scratch_buffer_size - packets_per_gpu : scratch_buffer_size],
input_buffer[local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu],
tb_group=global_intra_node_tbg,
)
# Broadcast Reduced Data
for peer_gpu_id in range(gpus_per_node):
peer_rank_id = peer_gpu_id + gpus_per_node * node_id
if peer_gpu_id != local_gpu_id:
intra_node_memory_channels[(peer_rank_id, current_rank_id)].put_packets(
tbg_id = peer_gpu_id if peer_gpu_id < local_gpu_id else peer_gpu_id - 1
intra_node_memory_channels[(peer_rank_id, current_rank_id)].read_put_packets(
scratch_buffers[peer_rank_id][
inter_node_offset
+ local_gpu_id * packets_per_gpu : inter_node_offset
+ local_gpu_id * packets_per_gpu
+ packets_per_gpu
],
input_buffer[
local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu
scratch_buffers[current_rank_id][
scratch_buffer_size - packets_per_gpu : scratch_buffer_size
],
tb_group=thread_block_group,
tb_group=thread_block_groups[tbg_id],
)
# Unpack Data Received from other GPUs in the same node
for peer_gpu_id in range(gpus_per_node):
if peer_gpu_id != local_gpu_id:
tbg_id = peer_gpu_id if peer_gpu_id < local_gpu_id else peer_gpu_id - 1
current_rank.unpack_packets(
input_buffer[
peer_gpu_id * packets_per_gpu : peer_gpu_id * packets_per_gpu + packets_per_gpu
@@ -157,7 +186,7 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
+ peer_gpu_id * packets_per_gpu
+ packets_per_gpu
],
tb_group=thread_block_group,
tb_group=thread_block_groups[tbg_id],
)
return prog

View File

@@ -604,6 +604,7 @@ class ReduceOperation(BaseOperation):
self,
local_src_buff: List[LocalChunk],
local_dst_buff: List[LocalChunk],
local_pkt_dst_buff: List[LocalChunk] = None,
remote_src_buff: List[RemoteChunk] = None,
remote_dst_buff: List[RemoteChunk] = None,
channel_ids: List[int] = None,
@@ -613,6 +614,7 @@ class ReduceOperation(BaseOperation):
tbg_info: ThreadBlockGroupInfo = None,
packet: bool = False,
):
local_pkt_dst_buff = local_pkt_dst_buff if local_pkt_dst_buff is not None else []
remote_src_buff = remote_src_buff if remote_src_buff is not None else []
remote_dst_buff = remote_dst_buff if remote_dst_buff is not None else []
channel_ids = channel_ids if channel_ids is not None else []
@@ -620,12 +622,18 @@ class ReduceOperation(BaseOperation):
if len(remote_src_buff) == 0 and len(remote_dst_buff) == 0:
if packet:
super().__init__(Instruction.reduce_packet)
if len(local_pkt_dst_buff) == 0:
super().__init__(Instruction.reduce_packet)
else:
super().__init__(Instruction.reduce_copy_packet)
else:
super().__init__(Instruction.reduce)
elif len(remote_src_buff) == 0:
if packet:
super().__init__(Instruction.reduce_send_packet)
if len(local_pkt_dst_buff) == 0:
super().__init__(Instruction.reduce_send_packet)
else:
super().__init__(Instruction.reduce_copy_send_packet)
else:
super().__init__(Instruction.reduce_send)
elif len(remote_dst_buff) == 0 and not packet:
@@ -637,6 +645,7 @@ class ReduceOperation(BaseOperation):
self.local_src_buff = local_src_buff
self.local_dst_buff = local_dst_buff
self.local_pkt_dst_buff = local_pkt_dst_buff
self.remote_src_buff = remote_src_buff
self.remote_dst_buff = remote_dst_buff
self.channel_ids = channel_ids
@@ -741,6 +750,49 @@ class ReduceOperation(BaseOperation):
tbg_info=self.tbg_info,
packet=self.packet,
)
if (
isinstance(other, CopyOperation)
and self.name == Instruction.reduce_packet
and other.name == Instruction.copy_packet
and self.local_dst_buff[0] == other.src_buff[0]
and self.tbg_info == other.tbg_info
):
fused_operation = ReduceOperation(
self.local_src_buff,
self.local_dst_buff,
local_pkt_dst_buff=other.dst_buff,
remote_src_buff=self.remote_src_buff,
remote_dst_buff=self.remote_dst_buff,
channel_ids=self.channel_ids,
put_channel_ids=self.put_channel_ids,
channel_type=self.channel_type,
reduce_operation=self.reduce_operation,
tbg_info=self.tbg_info,
packet=self.packet,
)
if (
isinstance(other, PutOperation)
and (self.name == Instruction.reduce_copy_packet or self.name == Instruction.reduce_copy_send_packet)
and (
(other.name == Instruction.put_packet and self.local_dst_buff[0] == other.src_buff[0])
or (other.name == Instruction.read_put_packet and self.local_pkt_dst_buff[0] == other.src_buff[0])
)
and other.channel_type == ChannelType.memory
and self.tbg_info == other.tbg_info
):
fused_operation = ReduceOperation(
self.local_src_buff,
self.local_dst_buff,
local_pkt_dst_buff=self.local_pkt_dst_buff,
remote_src_buff=self.remote_src_buff,
remote_dst_buff=self.remote_dst_buff + other.dst_buff,
channel_ids=self.channel_ids,
put_channel_ids=self.put_channel_ids + other.channel_ids,
channel_type=other.channel_type,
reduce_operation=self.reduce_operation,
tbg_info=self.tbg_info,
packet=self.packet,
)
return fused_operation
@@ -752,6 +804,8 @@ class ReduceOperation(BaseOperation):
result["dst_buff"] = []
for chunk in self.local_dst_buff:
result["dst_buff"].append(chunk.to_dict())
for chunk in self.local_pkt_dst_buff:
result["dst_buff"].append(chunk.to_dict())
if len(self.remote_src_buff) > 0:
for chunk in self.remote_src_buff:

View File

@@ -69,6 +69,7 @@ class Instruction(Enum):
unpack_packet = "upkt"
reduce = "re"
reduce_packet = "repkt"
reduce_copy_packet = "recpkt"
sem_acquire = "sem_acquire"
sem_release = "sem_release"
signal = "signal"
@@ -85,6 +86,7 @@ class Instruction(Enum):
put_with_signal_and_flush = "pwsf"
reduce_send = "res"
reduce_send_packet = "respkt"
reduce_copy_send_packet = "recspkt"
read_reduce = "rre"
read_reduce_send = "rres"
group_store = "gstore"

View File

@@ -0,0 +1,63 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Reduce-Copy-Packets Operation Test
This file demonstrates the use of reduce and copy operations in MSCCLPP with
packet format. The reduce-copy-packets pattern combines local reductions
with packet-based data copying, ensuring data integrity during the reduction
and copy process through the packet format.
WARNING: This algorithm is designed solely for demonstrating the use of a single
operation (reduce-copy-packets) and is NOT intended for production use. This test
may not work correctly in the MSCCLPP executor.
"""
import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *
def reduce_copy_packets_test(num_threads_per_block, min_message_size, max_message_size):
# Set up single GPU for reduce-copy-packets operations
gpus = 1
collective = TestCollective(gpus, 1, 0)
with CollectiveProgram(
"reduce_copy_packets_test",
collective,
gpus,
protocol="LL",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=False,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
rank = Rank(0)
input_buffer = rank.get_input_buffer()
# Create scratch buffer with 2 chunks for packet operations
scratch_buffer = Buffer(0, 2)
# Perform packet-based reduce: combine input into scratch buffer
rank.reduce(input_buffer[0:1], [scratch_buffer[0:1]], tb=0, packet=True)
# Copy reduced result to another buffer location using packet format
rank.copy_packets(scratch_buffer[1:2], input_buffer[0:1], tb=0)
print(JSON())
parser = argparse.ArgumentParser()
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
reduce_copy_packets_test(args.num_threads_per_block, args.min_message_size, args.max_message_size)

View File

@@ -0,0 +1,69 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Reduce-Copy-Send-Packets Operation Test
This file demonstrates the use of reduce, copy, and send operations in MSCCLPP with
packet format. The reduce-copy-send-packets pattern combines local reductions,
packet-based data copying, and remote data transfers, ensuring data integrity
during distributed GPU communication through the packet format.
WARNING: This algorithm is designed solely for demonstrating the use of a single
operation (reduce-copy-send-packets) and is NOT intended for production use. This test
may not work correctly in the MSCCLPP executor.
"""
import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *
def reduce_copy_send_packets_test(num_threads_per_block, min_message_size, max_message_size):
# Set up 2 GPUs for reduce-copy-send-packets operations
gpus = 2
collective = TestCollective(gpus, 1, 0)
with CollectiveProgram(
"reduce_copy_send_packets_test",
collective,
gpus,
protocol="LL",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=False,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
rank = Rank(0)
input_buffer = rank.get_input_buffer()
# Create scratch buffers for both GPUs
scratch_buffers = [Buffer(0, 2), Buffer(1, 1)]
# Establish memory channel for communication between GPUs
ch = MemoryChannel(1, 0)
# Perform packet-based reduce: combine input into local scratch buffer
rank.reduce(input_buffer[0:1], [scratch_buffers[0][0:1]], tb=0, packet=True)
# Copy reduced result to another buffer location using packet format
rank.copy_packets(scratch_buffers[0][1:2], input_buffer[0:1], tb=0)
# Send packet data to remote GPU
ch.put_packets(scratch_buffers[1][0:1], input_buffer[0:1], tb=0)
print(JSON())
parser = argparse.ArgumentParser()
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
reduce_copy_send_packets_test(args.num_threads_per_block, args.min_message_size, args.max_message_size)