From 76fdd1db7ab8a53330e8024c9e261f07c302056f Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 11 Apr 2026 04:53:49 +0000 Subject: [PATCH] WIP --- .../{mscclpp_send_recv.py => send_recv.py} | 2 +- python/mscclpp/language/collectives.py | 43 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) rename python/mscclpp/default_algos/{mscclpp_send_recv.py => send_recv.py} (98%) diff --git a/python/mscclpp/default_algos/mscclpp_send_recv.py b/python/mscclpp/default_algos/send_recv.py similarity index 98% rename from python/mscclpp/default_algos/mscclpp_send_recv.py rename to python/mscclpp/default_algos/send_recv.py index caa0575d..2127eb91 100644 --- a/python/mscclpp/default_algos/mscclpp_send_recv.py +++ b/python/mscclpp/default_algos/send_recv.py @@ -11,7 +11,7 @@ from mscclpp.language.collectives import * def send_recv_test(name, nnodes, gpus_per_node, split_mask): gpu_size = nnodes * gpus_per_node - collective = TestCollective(gpu_size, 1, 1) + collective = SendRecv(gpu_size, 1, False) with CollectiveProgram( name, collective, diff --git a/python/mscclpp/language/collectives.py b/python/mscclpp/language/collectives.py index 55c0e6b6..01c766ba 100644 --- a/python/mscclpp/language/collectives.py +++ b/python/mscclpp/language/collectives.py @@ -236,3 +236,46 @@ class AllToAll(Collective): } rank_buffers.append(buffers) return rank_buffers + + +class SendRecv(Collective): + """A SendRecv collective communication pattern. + + SendRecv performs a point-to-point send/receive operation in a ring topology. + Each rank sends its input buffer to the next rank and receives data from the + previous rank into its output buffer. + + This operation creates input and output buffers both sized by chunk_factor, + as each rank sends and receives the same amount of data. + """ + + def __init__(self, num_ranks, chunk_factor, inplace): + """Initialize a new SendRecv collective. + + Args: + num_ranks (int): The number of ranks participating in the SendRecv. + chunk_factor (int): The size factor for data chunks. + inplace (bool): Whether the operation should be performed in-place. + + Example: + >>> sendrecv = SendRecv(num_ranks=4, chunk_factor=1, inplace=False) + """ + Collective.__init__(self, num_ranks, chunk_factor, inplace) + self.name = "sendrecv" + + def init_buffers(self): + """Initialize buffers for the SendRecv operation. + + Creates input and output buffers both sized by chunk_factor. + + Returns: + list: A list of buffer dictionaries, one for each rank. + """ + rank_buffers = [] + for rank in range(self.num_ranks): + buffers = { + BufferType.input: BaseBuffer(rank, BufferType.input, 0, self.chunk_factor), + BufferType.output: BaseBuffer(rank, BufferType.output, 0, self.chunk_factor), + } + rank_buffers.append(buffers) + return rank_buffers