From d6360933364255d38ca5751b1a200475e0abd60e Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Thu, 8 May 2025 15:01:51 -0700 Subject: [PATCH] Asynchronous setup (#514) Cherry-picked a part of features from #167: now `Communicator::setup()` is unneeded. `Communicator::sendMemory()` conducts the task inline, and `Communicator::recvMemory()` and `Communicator::connect()` conducts the task asynchronously without explicit setup. --- include/mscclpp/core.hpp | 118 ++++++++++++---------------------- include/mscclpp/semaphore.hpp | 2 +- python/mscclpp/core_py.cpp | 4 +- src/communicator.cc | 96 +++++---------------------- src/core.cc | 4 -- src/include/communicator.hpp | 1 - test/unit/core_tests.cc | 14 ---- 7 files changed, 59 insertions(+), 180 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 1078e32e..57fad56d 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -557,64 +557,19 @@ class Context { friend class Endpoint; }; -/// A base class for objects that can be set up during @ref Communicator::setup(). -struct Setuppable { - virtual ~Setuppable() = default; - - /// Called inside @ref Communicator::setup() before any call to @ref endSetup() of any @ref Setuppable object that is - /// being set up within the same @ref Communicator::setup() call. - /// - /// @param bootstrap A shared pointer to the bootstrap implementation. - virtual void beginSetup(std::shared_ptr bootstrap); - - /// Called inside @ref Communicator::setup() after all calls to @ref beginSetup() of all @ref Setuppable objects that - /// are being set up within the same @ref Communicator::setup() call. - /// - /// @param bootstrap A shared pointer to the bootstrap implementation. - virtual void endSetup(std::shared_ptr bootstrap); -}; - -/// A non-blocking future that can be used to check if a value is ready and retrieve it. template -class NonblockingFuture { - std::shared_future future; - - public: - /// Default constructor. - NonblockingFuture() = default; - - /// Constructor that takes a shared future and moves it into the NonblockingFuture. - /// - /// @param future The shared future to move. - NonblockingFuture(std::shared_future&& future) : future(std::move(future)) {} - - /// Check if the value is ready to be retrieved. - /// - /// @return True if the value is ready, false otherwise. - bool ready() const { return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready; } - - /// Get the value. - /// - /// @return The value. - /// - /// @throws Error if the value is not ready. - T get() const { - if (!ready()) throw Error("NonblockingFuture::get() called before ready", ErrorCode::InvalidUsage); - return future.get(); - } -}; +using NonblockingFuture [[deprecated("Use std::shared_future instead. This will be removed in a future release.")]] = + std::shared_future; /// A class that sets up all registered memories and connections between processes. /// /// A typical way to use this class: -/// 1. Call @ref connectOnSetup() to declare connections between the calling process with other processes. -/// 2. Call @ref registerMemory() to register memory regions that will be used for communication. -/// 3. Call @ref sendMemoryOnSetup() or @ref recvMemoryOnSetup() to send/receive registered memory regions to/from +/// 1. Call connect() to declare connections between the calling process with other processes. +/// 2. Call registerMemory() to register memory regions that will be used for communication. +/// 3. Call sendMemory() or recvMemory() to send/receive registered memory regions to/from /// other processes. -/// 4. Call @ref setup() to set up all registered memories and connections declared in the previous steps. -/// 5. Call @ref NonblockingFuture::get() to get the registered memory regions received from other -/// processes. -/// 6. All done; use connections and registered memories to build channels. +/// 4. Call get() on all futures returned by connect() and recvMemory(). +/// 5. All done; use connections and registered memories to build channels. /// class Communicator { public: @@ -645,30 +600,42 @@ class Communicator { /// @return RegisteredMemory A handle to the buffer. RegisteredMemory registerMemory(void* ptr, size_t size, TransportFlags transports); - /// Send information of a registered memory to the remote side on setup. + /// Send information of a registered memory to the remote side. /// - /// This function registers a send to a remote process that will happen by a following call of @ref setup(). The send - /// will carry information about a registered memory on the local process. + /// The send will be performed immediately upon calling this function. /// /// @param memory The registered memory buffer to send information about. /// @param remoteRank The rank of the remote process. /// @param tag The tag to use for identifying the send. - void sendMemoryOnSetup(RegisteredMemory memory, int remoteRank, int tag); + void sendMemory(RegisteredMemory memory, int remoteRank, int tag); - /// Receive memory on setup. + [[deprecated("Use sendMemory() instead. This will be removed in a future release.")]] void sendMemoryOnSetup( + RegisteredMemory memory, int remoteRank, int tag) { + sendMemory(memory, remoteRank, tag); + } + + /// Receive memory information from a corresponding sendMemory call on the remote side. /// - /// This function registers a receive from a remote process that will happen by a following call of @ref setup(). The - /// receive will carry information about a registered memory on the remote process. + /// This function returns a future immediately. The actual receive will be performed upon calling + /// the first get() on the future. /// /// @param remoteRank The rank of the remote process. /// @param tag The tag to use for identifying the receive. - /// @return NonblockingFuture A non-blocking future of registered memory. - NonblockingFuture recvMemoryOnSetup(int remoteRank, int tag); + /// @return std::shared_future A non-blocking future of registered memory. + std::shared_future recvMemory(int remoteRank, int tag); - /// Connect to a remote rank on setup. + [[deprecated( + "Use recvMemory() instead. This will be removed in a future release.")]] NonblockingFuture + recvMemoryOnSetup(int remoteRank, int tag) { + return recvMemory(remoteRank, tag); + } + + /// Connect to a remote rank. /// - /// This function only prepares metadata for connection. The actual connection is made by a following call of - /// @ref setup(). Note that this function is two-way and a connection from rank `i` to remote rank `j` needs + /// This function will immediately send metadata about the local endpoint to the remote rank, and return a future + /// without waiting for the remote rank to respond. The connection will be established when the remote rank + /// responds with its own endpoint and the local rank calls the first get() on the future. + /// Note that this function is two-way and a connection from rank `i` to remote rank `j` needs /// to have a counterpart from rank `j` to rank `i`. Note that with IB, buffers are registered at a page level and if /// a buffer is spread through multiple pages and do not fully utilize all of them, IB's QP has to register for all /// involved pages. This potentially has security risks if the connection's accesses are given to a malicious process. @@ -676,9 +643,14 @@ class Communicator { /// @param remoteRank The rank of the remote process. /// @param tag The tag of the connection for identifying it. /// @param config The configuration for the local endpoint. - /// @return NonblockingFuture>> A non-blocking future of shared pointer - /// to the connection. - NonblockingFuture> connectOnSetup(int remoteRank, int tag, EndpointConfig localConfig); + /// @return std::shared_future> A non-blocking future of shared pointer to the connection. + std::shared_future> connect(int remoteRank, int tag, EndpointConfig localConfig); + + [[deprecated("Use connect() instead. This will be removed in a future release.")]] NonblockingFuture< + std::shared_ptr> + connectOnSetup(int remoteRank, int tag, EndpointConfig localConfig) { + return connect(remoteRank, tag, localConfig); + } /// Get the remote rank a connection is connected to. /// @@ -692,17 +664,7 @@ class Communicator { /// @return The tag the connection was made with. int tagOf(const Connection& connection); - /// Add a custom Setuppable object to a list of objects to be setup later, when @ref setup() is called. - /// - /// @param setuppable A shared pointer to the Setuppable object. - void onSetup(std::shared_ptr setuppable); - - /// Setup all objects that have registered for setup. - /// - /// This includes previous calls of @ref sendMemoryOnSetup(), @ref recvMemoryOnSetup(), @ref connectOnSetup(), and - /// @ref onSetup(). It is allowed to call this function multiple times, where the n-th call will only setup objects - /// that have been registered after the (n-1)-th call. - void setup(); + [[deprecated("setup() is now no-op and no longer needed. This will be removed in a future release.")]] void setup() {} private: // The interal implementation. diff --git a/include/mscclpp/semaphore.hpp b/include/mscclpp/semaphore.hpp index 55dbbe74..49e2687a 100644 --- a/include/mscclpp/semaphore.hpp +++ b/include/mscclpp/semaphore.hpp @@ -30,7 +30,7 @@ template