diff --git a/include/mscclpp/gpu.hpp b/include/mscclpp/gpu.hpp index b8d096e2..b289bd4d 100644 --- a/include/mscclpp/gpu.hpp +++ b/include/mscclpp/gpu.hpp @@ -31,6 +31,7 @@ using CUmemorytype = hipMemoryType; constexpr auto cudaErrorPeerAccessAlreadyEnabled = hipErrorPeerAccessAlreadyEnabled; constexpr auto cudaErrorContextIsDestroyed = hipErrorContextIsDestroyed; constexpr auto cudaErrorInvalidDevice = hipErrorInvalidDevice; +constexpr auto cudaErrorInvalidValue = hipErrorInvalidValue; constexpr auto cudaSuccess = hipSuccess; constexpr auto cudaErrorNotSupported = hipErrorNotSupported; constexpr auto cudaStreamNonBlocking = hipStreamNonBlocking; diff --git a/src/core/communicator.cc b/src/core/communicator.cc index 97fadbbd..81cd7bbe 100644 --- a/src/core/communicator.cc +++ b/src/core/communicator.cc @@ -31,23 +31,23 @@ ScopeGuard makeScopeGuard(Fn fn) { template std::shared_future makeOrderedRecvFuture(Impl* impl, int remoteRank, int tag, Fn fn) { auto thisRecvItem = std::make_shared>(); - auto future = std::async(std::launch::deferred, [impl, remoteRank, tag, thisRecvItem, - lastRecvItem = impl->getLastRecvItem(remoteRank, tag), - fn = std::move(fn)]() mutable { - [[maybe_unused]] auto cleanup = makeScopeGuard([impl, remoteRank, tag, thisRecvItem]() { - auto item = thisRecvItem->lock(); - auto it = impl->lastRecvItems_.find({remoteRank, tag}); - if (item && it != impl->lastRecvItems_.end() && it->second == item) { - impl->lastRecvItems_.erase(it); - } - }); + auto future = std::async(std::launch::deferred, + [impl, remoteRank, tag, thisRecvItem, lastRecvItem = impl->getLastRecvItem(remoteRank, tag), + fn = std::move(fn)]() mutable { + [[maybe_unused]] auto cleanup = makeScopeGuard([impl, remoteRank, tag, thisRecvItem]() { + auto item = thisRecvItem->lock(); + auto it = impl->lastRecvItems_.find({remoteRank, tag}); + if (item && it != impl->lastRecvItems_.end() && it->second == item) { + impl->lastRecvItems_.erase(it); + } + }); - if (lastRecvItem) { - // Recursive call to the previous receive items - lastRecvItem->wait(); - } - return fn(); - }); + if (lastRecvItem) { + // Recursive call to the previous receive items + lastRecvItem->wait(); + } + return fn(); + }); auto sharedFuture = std::shared_future(std::move(future)); auto recvItem = std::make_shared>(sharedFuture); *thisRecvItem = recvItem; @@ -156,13 +156,13 @@ MSCCLPP_API_CPP std::shared_future Communicator::connect(const Endpo return makeOrderedRecvFuture(pimpl_.get(), remoteRank, tag, [this, remoteRank, tag, localEndpoint]() mutable { - std::vector data; - bootstrap()->recv(data, remoteRank, tag); - auto remoteEndpoint = Endpoint::deserialize(data); - auto connection = context()->connect(localEndpoint, remoteEndpoint); - pimpl_->connectionInfos_[connection.impl_.get()] = {remoteRank, tag}; - return connection; - }); + std::vector data; + bootstrap()->recv(data, remoteRank, tag); + auto remoteEndpoint = Endpoint::deserialize(data); + auto connection = context()->connect(localEndpoint, remoteEndpoint); + pimpl_->connectionInfos_[connection.impl_.get()] = {remoteRank, tag}; + return connection; + }); } MSCCLPP_API_CPP std::shared_future Communicator::connect(const EndpointConfig& localConfig, int remoteRank,