From 02eb2cfc2e8bf7fa9f630c31d69c83ec90bcae65 Mon Sep 17 00:00:00 2001 From: Empyreus Date: Thu, 11 Jun 2026 20:46:09 +0000 Subject: [PATCH] add support for allgather packet for small message sizes --- include/mscclpp/npkit/npkit_event.hpp | 2 +- python/mscclpp/language/channel.py | 65 +++++++++++++++++++ .../mscclpp/language/internal/operations.py | 32 +++++++++ python/mscclpp/language/internal/types.py | 1 + src/core/executor/execution_plan.cc | 2 + src/core/include/execution_common.hpp | 1 + src/core/include/execution_kernel.hpp | 22 +++++++ tools/npkit/npkit_trace_generator.py | 1 + 8 files changed, 125 insertions(+), 1 deletion(-) diff --git a/include/mscclpp/npkit/npkit_event.hpp b/include/mscclpp/npkit/npkit_event.hpp index 60429193..5084638e 100644 --- a/include/mscclpp/npkit/npkit_event.hpp +++ b/include/mscclpp/npkit/npkit_event.hpp @@ -41,6 +41,6 @@ #define NPKIT_EVENT_KERNEL_ALLREDUCE_EXIT 0x1C #define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x1D -#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x39 +#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x3B #endif diff --git a/python/mscclpp/language/channel.py b/python/mscclpp/language/channel.py index de0f65c5..34aa3efb 100644 --- a/python/mscclpp/language/channel.py +++ b/python/mscclpp/language/channel.py @@ -959,6 +959,51 @@ class SwitchChannel: op = GroupStore(src_chunk, self.buffer_type, buffer_offset, size, tb_channel_ids, self.channel_type) get_program().add_operation(self.src_rank, tb, op) + def broadcast_packets(self, rank, src_chunk: Chunk, buffer_offset, size, tb): + """Broadcast packet-formatted data from source chunk to all ranks in the switch channel. + + Packet variant of :meth:`broadcast`. Emits a ``gstorepkt`` (MULTI_STORE_PKT) + operation that multicasts LL-protocol packets (data + flag) from the source + chunk to the specified buffer region across all ranks in the rank group, with + no explicit barrier required (the packet flag provides synchronization). + + Args: + rank (int): The rank that will execute this broadcast operation. + src_chunk (Chunk): The source chunk containing packet data to broadcast. + buffer_offset (int): The offset in the destination buffer where data will be stored. + size (int): The size of data to broadcast. + tb (int): The thread block ID that will execute this operation. + + Raises: + RuntimeError: If src_chunk rank is not in the rank group, if chunk size + doesn't match the required size, or if buffer size is insufficient. + + Example: + >>> channel.broadcast_packets(rank=0, src_chunk=chunk, buffer_offset=0, size=1, tb=0) + """ + self.src_rank = rank + if src_chunk.rank not in self.rank_group.ranks: + raise RuntimeError( + f"Destination chunk rank {src_chunk.rank} is not part of the rank group {self.rank_group.ranks}." + ) + if src_chunk.size != size: + raise RuntimeError(f"Destination chunk size {src_chunk.size} does not match the required size {size}.") + + for rank in self.rank_group.ranks: + if self.buffer_type == BufferType.scratch: + buffer_size = get_program().gpus[rank].scratch_chunks + else: + buffer_size = get_program().buffers[rank][self.buffer_type].size + + if buffer_size < buffer_offset + size: + raise RuntimeError( + f"Buffer size {buffer_size} is smaller than required size {buffer_offset + size} for rank {rank}." + ) + + tb_channel_ids = get_program().setup_channel(tb, self) + op = GroupStorePacket(src_chunk, self.buffer_type, buffer_offset, size, tb_channel_ids, self.channel_type) + get_program().add_operation(self.src_rank, tb, op) + class SwitchChannelRankView: """A rank-specific view of a SwitchChannel for performing operations. @@ -1022,3 +1067,23 @@ class SwitchChannel: >>> rank_view.broadcast(src_chunk=chunk, buffer_offset=0, size=1, tb=0) """ return self._channel.broadcast(self._rank, src_chunk, buffer_offset, size, tb) + + def broadcast_packets(self, src_chunk: Chunk, buffer_offset, size, tb): + """Perform a packet broadcast operation from this rank's perspective. + + Convenience method that calls the underlying channel's broadcast_packets + method with this view's rank automatically provided. + + Args: + src_chunk (Chunk): The source chunk containing packet data to broadcast. + buffer_offset (int): The offset in the destination buffer where data will be stored. + size (int): The size of data to broadcast. + tb (int): The thread block ID that will execute this operation. + + Returns: + The result of the underlying channel's broadcast_packets operation. + + Example: + >>> rank_view.broadcast_packets(src_chunk=chunk, buffer_offset=0, size=1, tb=0) + """ + return self._channel.broadcast_packets(self._rank, src_chunk, buffer_offset, size, tb) diff --git a/python/mscclpp/language/internal/operations.py b/python/mscclpp/language/internal/operations.py index b181c5fe..f1079345 100644 --- a/python/mscclpp/language/internal/operations.py +++ b/python/mscclpp/language/internal/operations.py @@ -934,6 +934,38 @@ class GroupStore(BaseOperation): return result +class GroupStorePacket(BaseOperation): + def __init__( + self, + src_chunk: Chunk, + buffer_type: BufferType, + buffer_offset: int, + size: int, + channel_ids: List[int], + channel_type: ChannelType = ChannelType.switch, + ): + super().__init__(Instruction.group_store_packet) + self.src_chunk = src_chunk + self.buffer_type = buffer_type + self.buffer_offset = buffer_offset + self.size = size + self.channel_ids = channel_ids + self.channel_type = channel_type + + def shift_buffers(self, instance, num_instances, replication_function): + self.buffer_offset = replication_function(self.buffer_offset, self.size, instance, num_instances) + self.src_chunk.index = replication_function(self.src_chunk.index, self.size, instance, num_instances) + + def to_dict(self): + result = {"name": self.name.value} + result["src_buff"] = [{"type": self.src_chunk.buffer.value, "index": self.src_chunk.index, "size": self.size}] + result["dst_buff"] = [ + {"switch_channel_id": self.channel_ids[0], "index": self.buffer_offset, "size": self.size} + ] + result["channel_type"] = self.channel_type.value + return result + + @dataclass class GroupLoadReduceStore(BaseOperation): def __init__( diff --git a/python/mscclpp/language/internal/types.py b/python/mscclpp/language/internal/types.py index 9bfe1c76..392282f4 100644 --- a/python/mscclpp/language/internal/types.py +++ b/python/mscclpp/language/internal/types.py @@ -90,6 +90,7 @@ class Instruction(Enum): read_reduce = "rre" read_reduce_send = "rres" group_store = "gstore" + group_store_packet = "gstorepkt" group_load_reduce = "glre" group_load_reduce_store = "glres" pipeline = "pipeline" diff --git a/src/core/executor/execution_plan.cc b/src/core/executor/execution_plan.cc index 09bd7062..a7619f7e 100644 --- a/src/core/executor/execution_plan.cc +++ b/src/core/executor/execution_plan.cc @@ -71,6 +71,8 @@ auto getOpType = [](const std::string& str) { return mscclpp::OperationType::MULTI_LOAD_REDUCE_STORE; } else if (str == "gstore") { return mscclpp::OperationType::MULTI_STORE; + } else if (str == "gstorepkt") { + return mscclpp::OperationType::MULTI_STORE_PKT; } else if (str == "rlxsignal") { return mscclpp::OperationType::RELAXED_SIGNAL; } else if (str == "rlxwait") { diff --git a/src/core/include/execution_common.hpp b/src/core/include/execution_common.hpp index e6d90ca6..6ce47d8f 100644 --- a/src/core/include/execution_common.hpp +++ b/src/core/include/execution_common.hpp @@ -67,6 +67,7 @@ enum class OperationType : uint8_t { SEM_RELEASE, SEM_ACQUIRE, MULTI_STORE, + MULTI_STORE_PKT, }; struct Channels { diff --git a/src/core/include/execution_kernel.hpp b/src/core/include/execution_kernel.hpp index c74f2127..b14c4fad 100644 --- a/src/core/include/execution_kernel.hpp +++ b/src/core/include/execution_kernel.hpp @@ -629,6 +629,26 @@ MSCCLPP_DEVICE_INLINE void handleMultiStore(const Operation& op, void* input, vo } #endif +#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 +template +MSCCLPP_DEVICE_INLINE void handleMultiStorePkt(const Operation& op, void* input, void* output, void* scratch) { + const uint32_t srcOffset = op.inputOffsets[0]; + const uint32_t dstOffset = op.outputOffsets[0]; + const uint32_t size = op.inputBufferSizes[0]; + uint32_t nPackets = size / sizeof(PacketPayload); + + PacketType* srcPackets = + (PacketType*)((char*)getBuffer(input, output, scratch, op.inputBufferRefs[0].type) + (srcOffset << 1)); + PacketType* multiPkt = (PacketType*)((char*)nvlsChannels_[op.nvlsOutputIndex].mcPtr + scratchOffset_ + (dstOffset << 1)); + + for (size_t idx = threadIdx.x; idx < nPackets; idx += blockDim.x) { + PacketPayload data = srcPackets[idx].read(flag_); + PacketType pkt(data, flag_); + mscclpp::SwitchChannelDeviceHandle::multimemStore(*(mscclpp::f32x4*)(&pkt), multiPkt + idx); + } +} +#endif + template MSCCLPP_DEVICE_INLINE void handlePipeline(const Operation& op, T* input, T* output, T* scratch #if defined(ENABLE_NPKIT) @@ -762,6 +782,8 @@ MSCCLPP_DEVICE_INLINE void executeDeviceFunction(const Operation& op, T* input, handleMultiLoadReduceStore(op, offset, unitSize); } else if (opType == OperationType::MULTI_STORE) { handleMultiStore(op, input, output, scratch, offset, unitSize); + } else if (opType == OperationType::MULTI_STORE_PKT) { + handleMultiStorePkt(op, input, output, scratch); } #endif else if (opType == OperationType::PIPELINE) { diff --git a/tools/npkit/npkit_trace_generator.py b/tools/npkit/npkit_trace_generator.py index 3897c514..52f899dc 100644 --- a/tools/npkit/npkit_trace_generator.py +++ b/tools/npkit/npkit_trace_generator.py @@ -40,6 +40,7 @@ def parse_npkit_event_header(npkit_event_header_path): "SEM_RELEASE", "SEM_ACQUIRE", "MULTI_STORE", + "MULTI_STORE_PKT", ] executor_op_to_offset = {} for executor_op in executor_ops: