Fix for multi-nodes test (#614)

Fix multi-node test

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Binyang Li
2025-08-14 20:44:43 -07:00
committed by GitHub
parent 671b688bb3
commit 03c0ff2a91
3 changed files with 36 additions and 26 deletions

View File

@@ -201,6 +201,7 @@ RegisteredMemory::Impl::Impl(const std::vector<char>::const_iterator& begin,
}
// Next decide how to set this->data
this->data = nullptr;
if (getHostHash() == this->hostHash && getPidHash() == this->pidHash) {
// The memory is local to the process, so originalDataPtr is valid as is
this->data = this->originalDataPtr;
@@ -211,22 +212,32 @@ RegisteredMemory::Impl::Impl(const std::vector<char>::const_iterator& begin,
if (this->isCuMemMapAlloc) {
#if (CUDA_NVLS_API_AVAILABLE)
CUmemGenericAllocationHandle handle;
if (getNvlsMemHandleType() == CU_MEM_HANDLE_TYPE_FABRIC) {
MSCCLPP_CUTHROW(cuMemImportFromShareableHandle(&handle, entry.shareableHandle, getNvlsMemHandleType()));
if (getHostHash() != this->hostHash) {
// TODO: only open handle if in same MNNVL domain
CUresult err = cuMemImportFromShareableHandle(&handle, entry.shareableHandle, getNvlsMemHandleType());
if (err != CUDA_SUCCESS) {
INFO(MSCCLPP_P2P, "Failed to import shareable handle from host: 0x%lx, may not be in the same MNNVL domain",
hostHash);
return;
}
} else {
int rootPidFd = syscall(SYS_pidfd_open, entry.rootPid, 0);
if (rootPidFd < 0) {
throw SysError("pidfd_open() failed", errno);
if (getNvlsMemHandleType() == CU_MEM_HANDLE_TYPE_FABRIC) {
MSCCLPP_CUTHROW(cuMemImportFromShareableHandle(&handle, entry.shareableHandle, getNvlsMemHandleType()));
} else {
int rootPidFd = syscall(SYS_pidfd_open, entry.rootPid, 0);
if (rootPidFd < 0) {
throw SysError("pidfd_open() failed", errno);
}
int fd = syscall(SYS_pidfd_getfd, rootPidFd, entry.fileDesc, 0);
if (fd < 0) {
throw SysError("pidfd_getfd() failed", errno);
}
INFO(MSCCLPP_P2P, "Get file descriptor %d from pidfd %d on peer 0x%lx", fd, rootPidFd, hostHash);
MSCCLPP_CUTHROW(cuMemImportFromShareableHandle(&handle, reinterpret_cast<void*>(fd),
CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR));
close(rootPidFd);
close(fd);
}
int fd = syscall(SYS_pidfd_getfd, rootPidFd, entry.fileDesc, 0);
if (fd < 0) {
throw SysError("pidfd_getfd() failed", errno);
}
INFO(MSCCLPP_P2P, "Get file descriptor %d from pidfd %d on peer 0x%lx", fd, rootPidFd, hostHash);
MSCCLPP_CUTHROW(cuMemImportFromShareableHandle(&handle, reinterpret_cast<void*>(fd),
CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR));
close(rootPidFd);
close(fd);
}
size_t minGran = detail::getMulticastGranularity(this->baseDataSize, CU_MULTICAST_GRANULARITY_MINIMUM);
size_t recommendedGran =
@@ -240,14 +251,13 @@ RegisteredMemory::Impl::Impl(const std::vector<char>::const_iterator& begin,
throw Error("CUDA does not support NVLS. Please ensure your CUDA version supports NVLS to use this feature.",
ErrorCode::InvalidUsage);
#endif
} else {
} else if (getHostHash() == this->hostHash) {
MSCCLPP_CUDATHROW(cudaIpcOpenMemHandle(&base, entry.cudaIpcBaseHandle, cudaIpcMemLazyEnablePeerAccess));
this->data = static_cast<char*>(base) + entry.cudaIpcOffsetFromBase;
}
}
if (this->data != nullptr) {
INFO(MSCCLPP_P2P, "Opened CUDA IPC handle at pointer %p", this->data);
} else {
// No valid data pointer can be set
this->data = nullptr;
}
}

View File

@@ -414,8 +414,8 @@ void BaseTestEngine::setupMeshConnections(std::vector<DeviceHandle<mscclpp::Port
mscclpp::RegisteredMemory& localRegMemory = (outputBuff) ? outputBufRegMem : inputBufRegMem;
// store memory to keep resource alive
inputMemory_ = inputBufRegMem;
outputMemory_ = outputBufRegMem;
inputMemories_.push_back(inputBufRegMem);
outputMemories_.push_back(outputBufRegMem);
setupMeshConnectionsInternal(connections, localRegMemory, remoteRegMemories);
if (setupChannel != nullptr) {
@@ -446,8 +446,8 @@ void BaseTestEngine::setupMeshConnections(std::vector<mscclpp::MemoryChannel>& m
mscclpp::RegisteredMemory& localRegMemory =
(outputBuff && semantic == ChannelSemantic::PUT) ? outputBufRegMem : inputBufRegMem;
// store memory to keep resource alive
inputMemory_ = inputBufRegMem;
outputMemory_ = outputBufRegMem;
inputMemories_.push_back(inputBufRegMem);
outputMemories_.push_back(outputBufRegMem);
setupMeshConnectionsInternal(connections, localRegMemory, remoteRegMemories);
std::unordered_map<size_t, std::vector<std::shared_ptr<mscclpp::MemoryDevice2DeviceSemaphore>>> memorySemaphores;
@@ -498,8 +498,8 @@ void BaseTestEngine::setupMeshConnections(std::vector<mscclpp::MemoryChannel>& m
(getPacketBuff) ? getPacketBufRegMem : ((outputBuff) ? outputBufRegMem : inputBufRegMem);
// store memory to keep resource alive
scratchMemory_ = getPacketBufRegMem;
inputMemory_ = inputBufRegMem;
outputMemory_ = outputBufRegMem;
inputMemories_.push_back(inputBufRegMem);
outputMemories_.push_back(outputBufRegMem);
setupMeshConnectionsInternal(connections, localRegMemory, remoteRegMemories);

View File

@@ -132,8 +132,8 @@ class BaseTestEngine {
std::shared_ptr<mscclpp::Communicator> comm_;
std::shared_ptr<mscclpp::BaseProxyService> chanService_;
mscclpp::RegisteredMemory scratchMemory_;
mscclpp::RegisteredMemory inputMemory_;
mscclpp::RegisteredMemory outputMemory_;
std::vector<mscclpp::RegisteredMemory> inputMemories_;
std::vector<mscclpp::RegisteredMemory> outputMemories_;
cudaStream_t stream_;
int error_;
};