mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-13 17:55:52 +00:00
Fix lint and ROCm error alias
Agent-Logs-Url: https://github.com/microsoft/mscclpp/sessions/0f0e525d-a69c-4ff7-8913-983243b5cbf7 Co-authored-by: Binyang2014 <9415966+Binyang2014@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
0c09239b06
commit
7724e49f31
@@ -31,6 +31,7 @@ using CUmemorytype = hipMemoryType;
|
||||
constexpr auto cudaErrorPeerAccessAlreadyEnabled = hipErrorPeerAccessAlreadyEnabled;
|
||||
constexpr auto cudaErrorContextIsDestroyed = hipErrorContextIsDestroyed;
|
||||
constexpr auto cudaErrorInvalidDevice = hipErrorInvalidDevice;
|
||||
constexpr auto cudaErrorInvalidValue = hipErrorInvalidValue;
|
||||
constexpr auto cudaSuccess = hipSuccess;
|
||||
constexpr auto cudaErrorNotSupported = hipErrorNotSupported;
|
||||
constexpr auto cudaStreamNonBlocking = hipStreamNonBlocking;
|
||||
|
||||
@@ -31,23 +31,23 @@ ScopeGuard<Fn> makeScopeGuard(Fn fn) {
|
||||
template <typename T, typename Impl, typename Fn>
|
||||
std::shared_future<T> makeOrderedRecvFuture(Impl* impl, int remoteRank, int tag, Fn fn) {
|
||||
auto thisRecvItem = std::make_shared<std::weak_ptr<BaseRecvItem>>();
|
||||
auto future = std::async(std::launch::deferred, [impl, remoteRank, tag, thisRecvItem,
|
||||
lastRecvItem = impl->getLastRecvItem(remoteRank, tag),
|
||||
fn = std::move(fn)]() mutable {
|
||||
[[maybe_unused]] auto cleanup = makeScopeGuard([impl, remoteRank, tag, thisRecvItem]() {
|
||||
auto item = thisRecvItem->lock();
|
||||
auto it = impl->lastRecvItems_.find({remoteRank, tag});
|
||||
if (item && it != impl->lastRecvItems_.end() && it->second == item) {
|
||||
impl->lastRecvItems_.erase(it);
|
||||
}
|
||||
});
|
||||
auto future = std::async(std::launch::deferred,
|
||||
[impl, remoteRank, tag, thisRecvItem, lastRecvItem = impl->getLastRecvItem(remoteRank, tag),
|
||||
fn = std::move(fn)]() mutable {
|
||||
[[maybe_unused]] auto cleanup = makeScopeGuard([impl, remoteRank, tag, thisRecvItem]() {
|
||||
auto item = thisRecvItem->lock();
|
||||
auto it = impl->lastRecvItems_.find({remoteRank, tag});
|
||||
if (item && it != impl->lastRecvItems_.end() && it->second == item) {
|
||||
impl->lastRecvItems_.erase(it);
|
||||
}
|
||||
});
|
||||
|
||||
if (lastRecvItem) {
|
||||
// Recursive call to the previous receive items
|
||||
lastRecvItem->wait();
|
||||
}
|
||||
return fn();
|
||||
});
|
||||
if (lastRecvItem) {
|
||||
// Recursive call to the previous receive items
|
||||
lastRecvItem->wait();
|
||||
}
|
||||
return fn();
|
||||
});
|
||||
auto sharedFuture = std::shared_future<T>(std::move(future));
|
||||
auto recvItem = std::make_shared<RecvItem<T>>(sharedFuture);
|
||||
*thisRecvItem = recvItem;
|
||||
@@ -156,13 +156,13 @@ MSCCLPP_API_CPP std::shared_future<Connection> Communicator::connect(const Endpo
|
||||
|
||||
return makeOrderedRecvFuture<Connection>(pimpl_.get(), remoteRank, tag,
|
||||
[this, remoteRank, tag, localEndpoint]() mutable {
|
||||
std::vector<char> data;
|
||||
bootstrap()->recv(data, remoteRank, tag);
|
||||
auto remoteEndpoint = Endpoint::deserialize(data);
|
||||
auto connection = context()->connect(localEndpoint, remoteEndpoint);
|
||||
pimpl_->connectionInfos_[connection.impl_.get()] = {remoteRank, tag};
|
||||
return connection;
|
||||
});
|
||||
std::vector<char> data;
|
||||
bootstrap()->recv(data, remoteRank, tag);
|
||||
auto remoteEndpoint = Endpoint::deserialize(data);
|
||||
auto connection = context()->connect(localEndpoint, remoteEndpoint);
|
||||
pimpl_->connectionInfos_[connection.impl_.get()] = {remoteRank, tag};
|
||||
return connection;
|
||||
});
|
||||
}
|
||||
|
||||
MSCCLPP_API_CPP std::shared_future<Connection> Communicator::connect(const EndpointConfig& localConfig, int remoteRank,
|
||||
|
||||
Reference in New Issue
Block a user