Use GpuIpcMem for NVLS connections (#719)

* Now `NvlsConnection` internally reuses `GpuIpcMem` for multicast
memory handling.
* Removed unnecessary barriers from `connectNvlsCollective()` (CUDA API
handles this automatically).
* Updated `GpuIpcMem::map()` and `GpuIpcMem::mapMulticast()` to return a
shared pointer with custom deleter for unmapping, which prevents misuse
of raw pointers and reduces states to be stored in the `GpuIpcMem`
instance.
* Now for `RuntimeIpc` type handles, for consistency with other types,
`cudaIpcOpenMemHandle` will be called in `GpuIpcMem::map()` instead of
the ctor of `GpuIpcMem`.

---------

Co-authored-by: Binyang Li <binyli@microsoft.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: Binyang2014 <9415966+Binyang2014@users.noreply.github.com>
This commit is contained in:
Changho Hwang
2026-01-14 21:16:04 -08:00
committed by GitHub
parent c2a87302bd
commit 105239fc6c
7 changed files with 226 additions and 290 deletions

View File

@@ -14,12 +14,12 @@ class NvlsConnection;
struct SwitchChannel {
private:
void* devicePtr_;
std::shared_ptr<char> mcPtr_;
std::shared_ptr<void> mcPtr_;
size_t bufferSize_;
public:
using DeviceHandle = SwitchChannelDeviceHandle;
SwitchChannel(void* devicePtr, std::shared_ptr<char> mcPtr, size_t bufferSize)
SwitchChannel(void* devicePtr, std::shared_ptr<void> mcPtr, size_t bufferSize)
: devicePtr_(devicePtr), mcPtr_(mcPtr), bufferSize_(bufferSize) {}
DeviceHandle deviceHandle() const;
void* getDevicePtr();
@@ -34,10 +34,6 @@ class NvlsConnection {
NvlsConnection() = delete;
std::vector<char> serialize();
// Everyone needs to synchronize after creating a NVLS connection before adding devices
void addDevice();
void addDevice(int cudaDeviceId);
/// Bind the memory allocated via mscclpp::GpuBuffer to the multicast handle. The behavior
/// is undefined if the devicePtr is not allocated by mscclpp::GpuBuffer.
/// @param devicePtr The device pointer returned by `mscclpp::GpuBuffer::data()`.
@@ -45,8 +41,6 @@ class NvlsConnection {
/// @return SwitchChannel with devicePtr, mcPtr and bufferSize
SwitchChannel bindAllocatedMemory(CUdeviceptr devicePtr, size_t size);
size_t getMultiCastMinGranularity();
private:
class Impl;
std::shared_ptr<Impl> pimpl_;

View File

@@ -29,8 +29,7 @@ void register_nvls(nb::module_& m) {
});
nb::class_<NvlsConnection>(m, "NvlsConnection")
.def("bind_allocated_memory", &NvlsConnection::bindAllocatedMemory, nb::arg("device_ptr"), nb::arg("size"))
.def("get_multicast_min_granularity", &NvlsConnection::getMultiCastMinGranularity);
.def("bind_allocated_memory", &NvlsConnection::bindAllocatedMemory, nb::arg("device_ptr"), nb::arg("size"));
m.def("connect_nvls_collective", &connectNvlsCollective, nb::arg("communicator"), nb::arg("all_ranks"),
nb::arg("buffer_size"));

View File

@@ -258,17 +258,12 @@ UniqueGpuIpcMemHandle GpuIpcMemHandle::createMulticast([[maybe_unused]] size_t b
#endif // !(CUDA_NVLS_API_AVAILABLE)
}
std::shared_ptr<GpuIpcMem> GpuIpcMem::create(const GpuIpcMemHandle& handle) {
return std::shared_ptr<GpuIpcMem>(new GpuIpcMem(handle));
}
GpuIpcMem::GpuIpcMem(const GpuIpcMemHandle& handle)
: handle_(handle),
allocHandle_(0),
multicastBuffer_(nullptr),
isMulticast_(false),
multicastBindedAddr_(0),
type_(GpuIpcMemHandle::Type::None),
basePtr_(nullptr),
baseSize_(0),
dataPtr_(nullptr),
dataSize_(0) {
: handle_(handle), allocHandle_(0), multicastAddedDeviceId_(-1), type_(GpuIpcMemHandle::Type::None) {
if (handle_.typeFlags == GpuIpcMemHandle::Type::None) {
THROW(GPU, Error, ErrorCode::InvalidUsage, "GpuIpcMemHandle type is None, cannot create GpuIpcMem");
}
@@ -288,16 +283,7 @@ GpuIpcMem::GpuIpcMem(const GpuIpcMemHandle& handle)
::close(fileDesc);
}
if ((type_ == GpuIpcMemHandle::Type::None) && (handle_.typeFlags & GpuIpcMemHandle::Type::RuntimeIpc)) {
cudaError_t err = cudaIpcOpenMemHandleWrapper(&basePtr_, handle_.runtimeIpc.handle);
if (err == cudaSuccess) {
baseSize_ = handle_.baseSize;
dataPtr_ = static_cast<void*>(static_cast<char*>(basePtr_) + handle_.offsetFromBase);
dataSize_ = handle_.baseSize - handle_.offsetFromBase;
type_ = GpuIpcMemHandle::Type::RuntimeIpc;
return;
} else {
(void)cudaGetLastError();
}
type_ = GpuIpcMemHandle::Type::RuntimeIpc;
}
if (type_ == GpuIpcMemHandle::Type::None) {
THROW(GPU, Error, ErrorCode::Aborted, "Failed to open GpuIpcMemHandle (type: ", handle_.typeFlags, ")");
@@ -305,44 +291,9 @@ GpuIpcMem::GpuIpcMem(const GpuIpcMemHandle& handle)
}
GpuIpcMem::~GpuIpcMem() {
if (type_ == GpuIpcMemHandle::Type::RuntimeIpc) {
cudaError_t err = cudaIpcCloseMemHandleWrapper(basePtr_, handle_.runtimeIpc.handle);
if (err != cudaSuccess) {
WARN(GPU, "Failed to close CUDA IPC handle at pointer ", basePtr_, ": ", cudaGetErrorString(err));
(void)cudaGetLastError();
}
} else if (type_ == GpuIpcMemHandle::Type::PosixFd || type_ == GpuIpcMemHandle::Type::Fabric) {
if (type_ == GpuIpcMemHandle::Type::PosixFd || type_ == GpuIpcMemHandle::Type::Fabric) {
CUresult res;
const char* errStr;
if (basePtr_) {
res = cuMemUnmap((CUdeviceptr)basePtr_, baseSize_);
if (res != CUDA_SUCCESS) {
(void)cuGetErrorString(res, &errStr);
WARN(GPU, "Failed to unmap CUDA memory at pointer ", basePtr_, ": ", errStr);
}
res = cuMemAddressFree((CUdeviceptr)basePtr_, baseSize_);
if (res != CUDA_SUCCESS) {
(void)cuGetErrorString(res, &errStr);
WARN(GPU, "Failed to free CUDA memory at pointer ", basePtr_, ": ", errStr);
}
}
#if (CUDA_NVLS_API_AVAILABLE)
if (isMulticast_ && multicastBindedAddr_) {
int deviceId;
res = cuPointerGetAttribute(&deviceId, CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL, multicastBindedAddr_);
if (res != CUDA_SUCCESS) {
(void)cuGetErrorString(res, &errStr);
WARN(GPU, "Failed to get device ordinal for pointer ", (void*)multicastBindedAddr_, ": ", errStr);
deviceId = -1;
} else if (deviceId < 0) {
WARN(GPU, "Invalid device ordinal ", deviceId, " for pointer ", (void*)multicastBindedAddr_);
}
CUdevice device;
if (cuDeviceGet(&device, deviceId) == CUDA_SUCCESS) {
(void)cuMulticastUnbind(allocHandle_, device, 0, baseSize_);
}
}
#endif // (CUDA_NVLS_API_AVAILABLE)
res = cuMemRelease(allocHandle_);
if (res != CUDA_SUCCESS) {
(void)cuGetErrorString(res, &errStr);
@@ -351,12 +302,24 @@ GpuIpcMem::~GpuIpcMem() {
}
}
void* GpuIpcMem::map() {
std::shared_ptr<void> GpuIpcMem::map() {
if (type_ == GpuIpcMemHandle::Type::None) {
THROW(GPU, Error, ErrorCode::InvalidUsage, "GpuIpcMemHandle type is None, cannot map memory");
} else if (dataPtr_ != nullptr) {
// Already mapped
return dataPtr_;
}
if (type_ == GpuIpcMemHandle::Type::RuntimeIpc) {
// RuntimeIpc: Open handle and return shared_ptr with cleanup in deleter
void* basePtr = nullptr;
MSCCLPP_CUDATHROW(cudaIpcOpenMemHandleWrapper(&basePtr, handle_.runtimeIpc.handle));
void* dataPtr = static_cast<void*>(static_cast<char*>(basePtr) + handle_.offsetFromBase);
cudaIpcMemHandle_t ipcHandle = handle_.runtimeIpc.handle;
return std::shared_ptr<void>(dataPtr, [self = shared_from_this(), basePtr, ipcHandle](void*) {
cudaError_t err = cudaIpcCloseMemHandleWrapper(basePtr, ipcHandle);
if (err != cudaSuccess) {
WARN(GPU, "Failed to close CUDA IPC handle at pointer ", basePtr, ": ", cudaGetErrorString(err));
(void)cudaGetLastError();
}
});
}
size_t pageSize = getpagesize();
@@ -384,15 +347,33 @@ void* GpuIpcMem::map() {
accessDesc.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;
MSCCLPP_CUTHROW(cuMemSetAccess(base, handle_.baseSize, &accessDesc, 1));
basePtr_ = (void*)base;
baseSize_ = handle_.baseSize;
dataPtr_ = static_cast<void*>(static_cast<char*>(basePtr_) + handle_.offsetFromBase);
dataSize_ = handle_.baseSize - handle_.offsetFromBase;
return dataPtr_;
void* basePtr = (void*)base;
size_t baseSize = handle_.baseSize;
void* dataPtr = static_cast<void*>(static_cast<char*>(basePtr) + handle_.offsetFromBase);
// Return shared_ptr with deleter that unmaps and frees memory
return std::shared_ptr<void>(dataPtr, [self = shared_from_this(), basePtr, baseSize](void*) {
CUresult res;
const char* errStr;
res = cuMemUnmap((CUdeviceptr)basePtr, baseSize);
if (res != CUDA_SUCCESS) {
(void)cuGetErrorString(res, &errStr);
WARN(GPU, "Failed to unmap CUDA memory at pointer ", basePtr, ": ", errStr);
}
res = cuMemAddressFree((CUdeviceptr)basePtr, baseSize);
if (res != CUDA_SUCCESS) {
(void)cuGetErrorString(res, &errStr);
WARN(GPU, "Failed to free CUDA memory at pointer ", basePtr, ": ", errStr);
}
// self release will trigger ~GpuIpcMem() which releases allocHandle_
});
}
void* GpuIpcMem::mapMulticast([[maybe_unused]] int numDevices, [[maybe_unused]] const CUdeviceptr bufferAddr,
[[maybe_unused]] size_t bufferSize) {
std::shared_ptr<void> GpuIpcMem::mapMulticast([[maybe_unused]] int numDevices, [[maybe_unused]] size_t mcOffset,
[[maybe_unused]] CUdeviceptr bufferAddr,
[[maybe_unused]] size_t bufferSize) {
#if (CUDA_NVLS_API_AVAILABLE)
if (type_ != GpuIpcMemHandle::Type::PosixFd && type_ != GpuIpcMemHandle::Type::Fabric) {
THROW(GPU, Error, ErrorCode::InvalidUsage,
@@ -400,7 +381,13 @@ void* GpuIpcMem::mapMulticast([[maybe_unused]] int numDevices, [[maybe_unused]]
}
int deviceId;
MSCCLPP_CUDATHROW(cudaGetDevice(&deviceId));
MSCCLPP_CUTHROW(cuMulticastAddDevice(allocHandle_, deviceId));
if (multicastAddedDeviceId_ == -1) {
MSCCLPP_CUTHROW(cuMulticastAddDevice(allocHandle_, deviceId));
multicastAddedDeviceId_ = deviceId;
} else if (multicastAddedDeviceId_ != deviceId) {
THROW(GPU, Error, ErrorCode::InvalidUsage, "Multicast device ID mismatch: expected ", multicastAddedDeviceId_,
", but got ", deviceId);
}
size_t minMcGran;
CUmulticastObjectProp prop = {};
@@ -410,34 +397,26 @@ void* GpuIpcMem::mapMulticast([[maybe_unused]] int numDevices, [[maybe_unused]]
MSCCLPP_CUTHROW(cuMulticastGetGranularity(&minMcGran, &prop, CU_MULTICAST_GRANULARITY_MINIMUM));
CUdeviceptr bufferPtr;
if (bufferAddr != 0) {
if (!isCuMemMapAllocated((void*)bufferAddr)) {
THROW(GPU, Error, ErrorCode::InvalidUsage,
"This NVLS connection tried to bind a buffer that was not allocated with cuMemMap");
}
if ((uintptr_t)bufferAddr % minMcGran != 0) {
THROW(GPU, Error, ErrorCode::InvalidUsage,
"This NVLS connection tried to bind a buffer that is not aligned to the minimum granularity", minMcGran);
}
if (bufferSize == 0) {
THROW(GPU, Error, ErrorCode::InvalidUsage, "NVLS buffer size should be larger than zero.");
}
if (bufferSize % minMcGran != 0) {
THROW(GPU, Error, ErrorCode::InvalidUsage,
"Tried to bind a multicast buffer that is not aligned to the minimum granularity ", minMcGran,
", buffer size: ", bufferSize);
}
bufferPtr = bufferAddr;
} else {
multicastBuffer_ = GpuBuffer<uint8_t>(handle_.baseSize).memory();
bufferPtr = (CUdeviceptr)(multicastBuffer_.get());
bufferSize = handle_.baseSize;
if (!isCuMemMapAllocated((void*)bufferAddr)) {
THROW(GPU, Error, ErrorCode::InvalidUsage,
"This NVLS connection tried to bind a buffer that was not allocated with cuMemMap");
}
if ((uintptr_t)bufferAddr % minMcGran != 0) {
THROW(GPU, Error, ErrorCode::InvalidUsage,
"This NVLS connection tried to bind a buffer that is not aligned to the minimum granularity ", minMcGran);
}
if (bufferSize == 0) {
THROW(GPU, Error, ErrorCode::InvalidUsage, "NVLS buffer size should be larger than zero.");
}
if (bufferSize % minMcGran != 0) {
THROW(GPU, Error, ErrorCode::InvalidUsage,
"Tried to bind a multicast buffer that is not aligned to the minimum granularity ", minMcGran,
", buffer size: ", bufferSize);
}
// will block until all devices call cuMulticastAddDevice()
MSCCLPP_CUTHROW(cuMulticastBindAddr(allocHandle_, 0, bufferPtr, bufferSize, 0));
multicastBindedAddr_ = bufferPtr;
// Bind the buffer at the specified offset in the multicast handle
// This will block until all devices call cuMulticastAddDevice()
MSCCLPP_CUTHROW(cuMulticastBindAddr(allocHandle_, mcOffset, bufferAddr, bufferSize, 0));
CUdeviceptr mcPtr;
MSCCLPP_CUTHROW(cuMemAddressReserve(&mcPtr, bufferSize, minMcGran, 0U, 0));
@@ -449,23 +428,35 @@ void* GpuIpcMem::mapMulticast([[maybe_unused]] int numDevices, [[maybe_unused]]
accessDesc.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;
MSCCLPP_CUTHROW(cuMemSetAccess(mcPtr, bufferSize, &accessDesc, 1));
basePtr_ = (void*)mcPtr;
baseSize_ = handle_.baseSize;
dataPtr_ = basePtr_;
dataSize_ = bufferSize;
isMulticast_ = true;
return dataPtr_;
// Return shared_ptr with custom deleter that unmaps and unbinds
CUmemGenericAllocationHandle allocHandle = allocHandle_;
return std::shared_ptr<void>(
reinterpret_cast<void*>(mcPtr), [self = shared_from_this(), mcOffset, bufferSize, allocHandle](void* ptr) {
CUresult res;
const char* errStr;
res = cuMemUnmap((CUdeviceptr)ptr, bufferSize);
if (res != CUDA_SUCCESS) {
(void)cuGetErrorString(res, &errStr);
WARN(GPU, "Failed to unmap CUDA memory at pointer ", (void*)ptr, ": ", errStr);
}
res = cuMemAddressFree((CUdeviceptr)ptr, bufferSize);
if (res != CUDA_SUCCESS) {
(void)cuGetErrorString(res, &errStr);
WARN(GPU, "Failed to free CUDA memory at pointer ", (void*)ptr, ": ", errStr);
}
int deviceId;
CUdevice device;
if (cudaGetDevice(&deviceId) == cudaSuccess && cuDeviceGet(&device, deviceId) == CUDA_SUCCESS) {
(void)cuMulticastUnbind(allocHandle, device, mcOffset, bufferSize);
}
});
#else // !(CUDA_NVLS_API_AVAILABLE)
THROW(GPU, Error, ErrorCode::InvalidUsage,
"NVLS is not supported on this device (requires CUDA version >= 12.3 and Linux kernel version >= 5.6.0)");
#endif // !(CUDA_NVLS_API_AVAILABLE)
}
void* GpuIpcMem::data() const {
if (!dataPtr_) {
THROW(GPU, Error, ErrorCode::InvalidUsage, "GpuIpcMem data pointer is null. Call map() first.");
}
return dataPtr_;
}
} // namespace mscclpp

View File

@@ -77,33 +77,37 @@ static_assert(std::is_trivially_copyable_v<GpuIpcMemHandle>);
/// GpuIpcMem represents a GPU memory region that has been imported using a GpuIpcMemHandle.
/// If a RegisteredMemory instance represents an imported GPU memory, it will manage a unique
/// GpuIpcMem instance for that memory region.
class GpuIpcMem {
class GpuIpcMem : public std::enable_shared_from_this<GpuIpcMem> {
public:
GpuIpcMem(const GpuIpcMemHandle &handle);
/// Create a GpuIpcMem instance from a GpuIpcMemHandle.
/// @param handle The handle to import.
/// @return A shared_ptr to the created GpuIpcMem instance.
static std::shared_ptr<GpuIpcMem> create(const GpuIpcMemHandle &handle);
~GpuIpcMem();
void *map();
/// Map the imported GPU memory for access. Subsequent calls to map() will simply create a new mapping
/// to the same memory, which is not a desired usage pattern.
/// @return A shared_ptr to the mapped memory. When all references are released,
/// the memory is automatically unmapped.
std::shared_ptr<void> map();
void *mapMulticast(int numDevices, const CUdeviceptr bufferAddr = 0, size_t bufferSize = 0);
void *multicastBuffer() const { return isMulticast_ ? multicastBuffer_.get() : nullptr; }
void *data() const;
size_t size() const { return dataSize_; }
/// Map multicast memory at the given offset.
/// @param numDevices Number of devices participating in multicast.
/// @param mcOffset Offset in the multicast buffer.
/// @param bufferAddr Device pointer to bind.
/// @param bufferSize Size of the buffer to bind.
/// @return A shared_ptr to the mapped multicast memory. When all references are released,
/// the memory is automatically unmapped and unbound.
std::shared_ptr<void> mapMulticast(int numDevices, size_t mcOffset, CUdeviceptr bufferAddr, size_t bufferSize);
private:
GpuIpcMem(const GpuIpcMemHandle &handle);
GpuIpcMemHandle handle_;
CUmemGenericAllocationHandle allocHandle_;
std::shared_ptr<uint8_t> multicastBuffer_;
bool isMulticast_;
CUdeviceptr multicastBindedAddr_;
int multicastAddedDeviceId_;
uint8_t type_;
void *basePtr_;
size_t baseSize_;
void *dataPtr_;
size_t dataSize_;
};
} // namespace mscclpp

View File

@@ -42,7 +42,7 @@ struct RegisteredMemory::Impl {
std::shared_ptr<void> peerMemHandle;
UniqueGpuIpcMemHandle localGpuIpcMemHandle;
std::unique_ptr<GpuIpcMem> remoteGpuIpcMem;
std::shared_ptr<void> remoteMemMap;
// Only used for IB transport
std::unordered_map<Transport, std::unique_ptr<const IbMr>> ibMrMap;

View File

@@ -159,8 +159,10 @@ RegisteredMemory::Impl::Impl(const std::vector<char>::const_iterator& begin,
}
} else if (transports.has(Transport::CudaIpc)) {
auto entry = getTransportInfo(Transport::CudaIpc);
this->remoteGpuIpcMem = std::make_unique<GpuIpcMem>(entry.gpuIpcMemHandle);
this->data = this->remoteGpuIpcMem->map();
auto gpuIpcMem = GpuIpcMem::create(entry.gpuIpcMemHandle);
// Create a memory map for the remote GPU memory. The memory map will keep the GpuIpcMem instance alive.
this->remoteMemMap = gpuIpcMem->map();
this->data = this->remoteMemMap.get();
}
if (this->data != nullptr) {
INFO(GPU, "Opened CUDA IPC handle at pointer ", this->data);

View File

@@ -1,136 +1,120 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <sys/syscall.h>
#include <unistd.h>
// Licensed under the MIT License.
#include <algorithm>
#include <list>
#include <mscclpp/core.hpp>
#include <mscclpp/switch_channel.hpp>
#include <mscclpp/utils.hpp>
#include "api.h"
#include "debug.h"
#include "endpoint.hpp"
#include "unix_socket.hpp"
#include "gpu_ipc_mem.hpp"
#include "logger.hpp"
#include "serialization.hpp"
namespace mscclpp {
#if (CUDA_NVLS_API_AVAILABLE)
class NvlsConnection::Impl : public std::enable_shared_from_this<NvlsConnection::Impl> {
public:
// use this only for the root of the NVLS
// For root
Impl(size_t bufferSize, int numDevices);
// For non-root
Impl(const std::vector<char>& data);
~Impl();
Impl(const Impl&) = delete;
Impl& operator=(const Impl&) = delete;
size_t getMinMcGran() { return minMcGran_; }
std::vector<char> serialize();
void addDevice(int cudaDeviceId);
size_t allocateBuffer(size_t size);
void freeBuffer(size_t offset, size_t size) noexcept;
std::shared_ptr<char> bindMemory(CUdeviceptr devicePtr, size_t devBuffSize);
std::shared_ptr<void> bindMemory(CUdeviceptr devicePtr, size_t devBuffSize);
private:
friend class NvlsConnection;
CUmemGenericAllocationHandle mcHandle_;
CUmulticastObjectProp mcProp_;
size_t bufferSize_;
size_t allocateRange(size_t size);
void freeRange(size_t offset, size_t size) noexcept;
// Store the GpuIpcMemHandle for the multicast (only on root)
UniqueGpuIpcMemHandle localGpuIpcMemHandle_;
// The GpuIpcMem for multicast operations (both root and non-root)
std::shared_ptr<GpuIpcMem> gpuIpcMem_;
size_t minMcGran_;
size_t mcGran_;
// These are only defined for multicast (NVLS) capability
int rootFd_;
int rootPid_;
int mcFileDesc_;
UnixSocketClient& socketClient_ = UnixSocketClient::instance();
bool isRoot_;
int numDevices_;
// Track allocated and free ranges within the multicast buffer
std::list<std::pair<size_t, size_t>> allocatedRanges_;
std::list<std::pair<size_t, size_t>> freeRanges_;
};
NvlsConnection::Impl::Impl(size_t bufferSize, int numDevices) : rootFd_(-1), mcFileDesc_(-1) {
minMcGran_ = 0;
mcGran_ = 0;
mcProp_ = {};
mcProp_.size = bufferSize;
mcProp_.numDevices = numDevices;
mcProp_.handleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR;
MSCCLPP_CUTHROW(cuMulticastGetGranularity(&minMcGran_, &mcProp_, CU_MULTICAST_GRANULARITY_MINIMUM));
MSCCLPP_CUTHROW(cuMulticastGetGranularity(&mcGran_, &mcProp_, CU_MULTICAST_GRANULARITY_RECOMMENDED));
mcProp_.size = ((mcProp_.size + mcGran_ - 1) / mcGran_) * mcGran_;
bufferSize_ = mcProp_.size;
INFO(MSCCLPP_COLL, "NVLS multicast properties: size=%ld, numDevices=%d, handleTypes=%lld", mcProp_.size,
mcProp_.numDevices, mcProp_.handleTypes);
MSCCLPP_CUTHROW(cuMulticastCreate(&mcHandle_, &mcProp_));
MSCCLPP_CUTHROW(
cuMemExportToShareableHandle(&mcFileDesc_, mcHandle_, CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0 /*flags*/));
freeRanges_.emplace_back(0, bufferSize_);
rootPid_ = getpid();
rootFd_ = UnixSocketServer::instance().registerFd(mcFileDesc_);
INFO(MSCCLPP_COLL,
"NVLS handle created on root with size %ld. minGranularity %ld and recommendedGranularity %ld buffer size is "
"%ld, adjusted size is %ld",
mcProp_.size, minMcGran_, mcGran_, bufferSize, bufferSize_);
}
NvlsConnection::Impl::Impl(const std::vector<char>& data) : rootFd_(-1), mcFileDesc_(-1) {
auto it = data.begin();
std::copy_n(it, sizeof(this->mcHandle_), reinterpret_cast<char*>(&this->mcHandle_));
it += sizeof(this->mcHandle_);
std::copy_n(it, sizeof(this->bufferSize_), reinterpret_cast<char*>(&this->bufferSize_));
it += sizeof(this->bufferSize_);
std::copy_n(it, sizeof(this->minMcGran_), reinterpret_cast<char*>(&this->minMcGran_));
it += sizeof(this->minMcGran_);
std::copy_n(it, sizeof(this->mcGran_), reinterpret_cast<char*>(&this->mcGran_));
it += sizeof(this->mcGran_);
std::copy_n(it, sizeof(this->rootPid_), reinterpret_cast<char*>(&this->rootPid_));
it += sizeof(this->rootPid_);
std::copy_n(it, sizeof(this->rootFd_), reinterpret_cast<char*>(&this->rootFd_));
freeRanges_.emplace_back(0, bufferSize_);
int mcRootFileDescFd = socketClient_.requestFd(UnixSocketServer::generateSocketPath(this->rootPid_), rootFd_);
MSCCLPP_CUTHROW(cuMemImportFromShareableHandle(&mcHandle_, reinterpret_cast<void*>(mcRootFileDescFd),
CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR));
::close(mcRootFileDescFd);
INFO(MSCCLPP_COLL, "NVLS handle was imported from root");
}
NvlsConnection::Impl::~Impl() {
// we don't need to free multicast handle object according to NCCL.
if (mcFileDesc_ >= 0) {
UnixSocketServer::instance().unregisterFd(rootFd_);
::close(mcFileDesc_);
NvlsConnection::Impl::Impl(size_t bufferSize, int numDevices) : isRoot_(true), numDevices_(numDevices) {
// Create the multicast handle using GpuIpcMemHandle
localGpuIpcMemHandle_ = GpuIpcMemHandle::createMulticast(bufferSize, numDevices);
if (!localGpuIpcMemHandle_ || localGpuIpcMemHandle_->typeFlags == GpuIpcMemHandle::Type::None) {
THROW(CONN, Error, ErrorCode::SystemError, "Failed to create multicast handle");
}
// Create GpuIpcMem from the handle to get access to the allocation handle
gpuIpcMem_ = GpuIpcMem::create(*localGpuIpcMemHandle_);
// Compute minimum granularity for user buffer alignment
CUmulticastObjectProp mcProp = {};
mcProp.size = localGpuIpcMemHandle_->baseSize;
mcProp.numDevices = numDevices_;
size_t minMcGranPosixFd;
mcProp.handleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR;
MSCCLPP_CUTHROW(cuMulticastGetGranularity(&minMcGranPosixFd, &mcProp, CU_MULTICAST_GRANULARITY_MINIMUM));
if (localGpuIpcMemHandle_->typeFlags & GpuIpcMemHandle::Type::Fabric) {
size_t minMcGranFabric;
mcProp.handleTypes = CU_MEM_HANDLE_TYPE_FABRIC;
MSCCLPP_CUTHROW(cuMulticastGetGranularity(&minMcGranFabric, &mcProp, CU_MULTICAST_GRANULARITY_MINIMUM));
minMcGran_ = std::max(minMcGranPosixFd, minMcGranFabric);
} else {
minMcGran_ = minMcGranPosixFd;
}
// Initialize free ranges with the entire buffer
freeRanges_.emplace_back(0, localGpuIpcMemHandle_->baseSize);
INFO(CONN, "NVLS handle created on root with buffer size ", localGpuIpcMemHandle_->baseSize, ", minGranularity ",
minMcGran_);
}
NvlsConnection::Impl::Impl(const std::vector<char>& data) : isRoot_(false) {
auto it = data.begin();
GpuIpcMemHandle handle;
it = detail::deserialize(it, handle);
it = detail::deserialize(it, minMcGran_);
it = detail::deserialize(it, numDevices_);
// Create GpuIpcMem from the handle to import the multicast
gpuIpcMem_ = GpuIpcMem::create(handle);
// Initialize free ranges with the entire buffer
freeRanges_.emplace_back(0, handle.baseSize);
INFO(CONN, "NVLS handle is imported from root with buffer size ", handle.baseSize);
}
std::vector<char> NvlsConnection::Impl::serialize() {
if (!isRoot_) {
THROW(CONN, Error, ErrorCode::InvalidUsage, "Only root NVLS connection can serialize the handle");
}
std::vector<char> result;
std::copy_n(reinterpret_cast<char*>(&mcHandle_), sizeof(mcHandle_), std::back_inserter(result));
std::copy_n(reinterpret_cast<char*>(&bufferSize_), sizeof(bufferSize_), std::back_inserter(result));
std::copy_n(reinterpret_cast<char*>(&minMcGran_), sizeof(minMcGran_), std::back_inserter(result));
std::copy_n(reinterpret_cast<char*>(&mcGran_), sizeof(mcGran_), std::back_inserter(result));
std::copy_n(reinterpret_cast<char*>(&rootPid_), sizeof(rootPid_), std::back_inserter(result));
std::copy_n(reinterpret_cast<char*>(&rootFd_), sizeof(rootFd_), std::back_inserter(result));
detail::serialize(result, *localGpuIpcMemHandle_);
detail::serialize(result, minMcGran_);
detail::serialize(result, numDevices_);
return result;
}
void NvlsConnection::Impl::addDevice(int cudaDeviceId) {
MSCCLPP_CUTHROW(cuMulticastAddDevice(mcHandle_, cudaDeviceId));
INFO(MSCCLPP_COLL, "NVLS connection created");
}
// TODO(binyli): For cuMemMap, we can not map handle to va with offset not equal to 0.
// Then we don't need to maintain the freeRanges_ list. For different memory, we could map to different mc handle.
size_t NvlsConnection::Impl::allocateBuffer(size_t size) {
size_t NvlsConnection::Impl::allocateRange(size_t size) {
if (freeRanges_.empty()) {
throw Error("This NVLS connection mapped more than it was supposed to", ErrorCode::InvalidUsage);
THROW(CONN, Error, ErrorCode::InvalidUsage, "This NVLS connection mapped more than it was supposed to");
}
auto it = std::find_if(freeRanges_.begin(), freeRanges_.end(),
[size](const std::pair<size_t, size_t>& range) { return range.second >= size; });
@@ -144,18 +128,18 @@ size_t NvlsConnection::Impl::allocateBuffer(size_t size) {
it->second -= size;
}
allocatedRanges_.emplace_back(offset, size);
INFO(MSCCLPP_COLL, "NVLS connection allocated %ld bytes at offset %ld", size, offset);
INFO(CONN, "NVLS connection allocated ", size, " bytes at offset ", offset);
return offset;
}
throw Error("This NVLS connection cannot map the requested devBuffSize", ErrorCode::InvalidUsage);
THROW(CONN, Error, ErrorCode::InvalidUsage, "This NVLS connection cannot map the requested devBuffSize");
}
void NvlsConnection::Impl::freeBuffer(size_t offset, size_t size) noexcept {
void NvlsConnection::Impl::freeRange(size_t offset, size_t size) noexcept {
auto it = std::find_if(
allocatedRanges_.begin(), allocatedRanges_.end(),
[offset, size](const std::pair<size_t, size_t>& range) { return range.first == offset && range.second == size; });
if (it == allocatedRanges_.end()) {
WARN("NVLS connection tried to free a buffer that was not allocated");
WARN(CONN, "NVLS connection tried to free a range that was not allocated");
return;
}
allocatedRanges_.erase(it);
@@ -187,42 +171,23 @@ void NvlsConnection::Impl::freeBuffer(size_t offset, size_t size) noexcept {
}
}
std::shared_ptr<char> NvlsConnection::Impl::bindMemory(CUdeviceptr devicePtr, size_t devBuffSize) {
if (!isCuMemMapAllocated((void*)devicePtr)) {
throw Error("This NVLS connection tried to bind a buffer that was not allocated with cuMemMap",
ErrorCode::InvalidUsage);
}
if ((uintptr_t)devicePtr % minMcGran_ != 0) {
WARN("NVLS connection tried to bind a buffer that is not aligned to the minimum granularity");
throw Error("This NVLS connection tried to bind a buffer that is not aligned to the minimum granularity",
ErrorCode::InvalidUsage);
}
std::shared_ptr<void> NvlsConnection::Impl::bindMemory(CUdeviceptr devicePtr, size_t devBuffSize) {
// Align buffer size to minimum granularity
devBuffSize = ((devBuffSize + minMcGran_ - 1) / minMcGran_) * minMcGran_;
size_t offset = allocateBuffer(devBuffSize);
MSCCLPP_CUTHROW(cuMulticastBindAddr(mcHandle_, offset /*mcOffset*/, devicePtr, devBuffSize, 0));
char* mcPtr;
MSCCLPP_CUTHROW(cuMemAddressReserve((CUdeviceptr*)(&mcPtr), devBuffSize, minMcGran_, 0U, 0));
MSCCLPP_CUTHROW(cuMemMap((CUdeviceptr)(mcPtr), devBuffSize, 0, mcHandle_, 0));
detail::setReadWriteMemoryAccess(mcPtr, devBuffSize);
INFO(MSCCLPP_COLL, "NVLS connection bound memory %p to %p at offset %ld, size %ld", (void*)devicePtr, mcPtr, offset,
// Allocate a range in the multicast buffer
size_t offset = allocateRange(devBuffSize);
// mapMulticast returns a shared_ptr that handles cleanup when released
std::shared_ptr<void> mcPtr = gpuIpcMem_->mapMulticast(numDevices_, offset, devicePtr, devBuffSize);
INFO(CONN, "NVLS connection bound memory ", (void*)devicePtr, " to ", mcPtr.get(), " at offset ", offset, ", size ",
devBuffSize);
auto deleter = [=, self = shared_from_this()](char* ptr) {
int deviceId;
CUdevice device;
MSCCLPP_CUDATHROW(cudaGetDevice(&deviceId));
MSCCLPP_CUTHROW(cuDeviceGet(&device, deviceId));
MSCCLPP_CUTHROW(cuMemUnmap((CUdeviceptr)ptr, devBuffSize));
MSCCLPP_CUTHROW(cuMemAddressFree((CUdeviceptr)ptr, devBuffSize));
// Refer to NCCL, Unbind can trigger RM error if buffer is freed already by users.
// Ignore error here, unbind will succeed anyway.
cuMulticastUnbind(mcHandle_, device, offset, devBuffSize);
self->freeBuffer(offset, devBuffSize);
};
return std::shared_ptr<char>(mcPtr, deleter);
// Wrap mcPtr with an additional deleter that frees the range
return std::shared_ptr<void>(mcPtr.get(), [self = shared_from_this(), mcPtr, offset, devBuffSize](void*) {
// mcPtr destructor will handle unmap/unbind; we just need to free the range
self->freeRange(offset, devBuffSize);
});
}
#else // !(CUDA_NVLS_API_AVAILABLE)
@@ -236,11 +201,9 @@ class NvlsConnection::Impl {
Impl& operator=(const Impl&) = delete;
std::vector<char> serialize() { throw notSupportedError; }
size_t allocateBuffer(size_t) { throw notSupportedError; }
void freeBuffer(size_t, size_t) { throw notSupportedError; }
std::shared_ptr<char> bindMemory(CUdeviceptr, size_t) { throw notSupportedError; }
void addDevice(int) { throw notSupportedError; }
size_t getMinMcGran() { throw notSupportedError; }
size_t allocateRange(size_t) { throw notSupportedError; }
void freeRange(size_t, size_t) { throw notSupportedError; }
std::shared_ptr<void> bindMemory(CUdeviceptr, size_t) { throw notSupportedError; }
private:
Error notSupportedError =
@@ -251,14 +214,6 @@ class NvlsConnection::Impl {
NvlsConnection::NvlsConnection(size_t bufferSize, int numDevices)
: pimpl_(std::make_shared<Impl>(bufferSize, numDevices)) {}
void NvlsConnection::addDevice() {
int cudaDeviceId;
MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDeviceId));
this->addDevice(cudaDeviceId);
}
void NvlsConnection::addDevice(int cudaDeviceId) { pimpl_->addDevice(cudaDeviceId); }
NvlsConnection::NvlsConnection(const std::vector<char>& data) : pimpl_(std::make_shared<Impl>(data)) {}
std::vector<char> NvlsConnection::serialize() { return pimpl_->serialize(); }
@@ -270,16 +225,14 @@ SwitchChannel NvlsConnection::bindAllocatedMemory(CUdeviceptr devicePtr, size_t
SwitchChannel::DeviceHandle SwitchChannel::deviceHandle() const {
SwitchChannel::DeviceHandle device;
device.devicePtr = this->devicePtr_;
device.mcPtr = this->mcPtr_.get();
device.bufferSize = this->bufferSize_;
device.devicePtr = devicePtr_;
device.mcPtr = mcPtr_.get();
device.bufferSize = bufferSize_;
return device;
};
void* SwitchChannel::getDevicePtr() { return devicePtr_; };
size_t NvlsConnection::getMultiCastMinGranularity() { return pimpl_->getMinMcGran(); }
MSCCLPP_API_CPP std::shared_ptr<NvlsConnection> connectNvlsCollective(std::shared_ptr<Communicator> comm,
std::vector<int> allRanks, size_t bufferSize) {
auto bootstrap = comm->bootstrap();
@@ -309,13 +262,6 @@ MSCCLPP_API_CPP std::shared_ptr<NvlsConnection> connectNvlsCollective(std::share
conn = std::make_shared<NvlsConnection>(data);
}
// Now let's synchronize all ranks
bootstrap->groupBarrier(allRanks);
// now it is safe to add my device
conn->addDevice();
// sync here to make sure all ranks have added their devices
bootstrap->groupBarrier(allRanks);
return conn;
}