Support CudaIpc connection within a single process (#593)

* Allow CudaIpc connection between GPUs in a single process
* Added an example of connection in a single process
* Minor interface updates

---------

Co-authored-by: Binyang Li <binyli@microsoft.com>
This commit is contained in:
Changho Hwang
2025-08-02 12:59:36 +08:00
committed by GitHub
parent c3b47c59fd
commit c580e4c503
13 changed files with 262 additions and 49 deletions

View File

@@ -660,7 +660,7 @@ NCCL_API ncclResult_t ncclGetUniqueId(ncclUniqueId* uniqueId) {
WARN("uniqueId is nullptr");
return ncclInvalidArgument;
}
if (MSCCLPP_UNIQUE_ID_BYTES != NCCL_UNIQUE_ID_BYTES) return ncclInternalError;
if (mscclpp::UniqueIdBytes != NCCL_UNIQUE_ID_BYTES) return ncclInternalError;
mscclpp::UniqueId id = mscclpp::TcpBootstrap::createUniqueId();
memcpy(uniqueId, &id, sizeof(ncclUniqueId));
return ncclSuccess;

View File

@@ -0,0 +1 @@
gpu_ping_pong

View File

@@ -0,0 +1,22 @@
CUDA_HOME ?= /usr/local/cuda
ROCM_HOME ?= /opt/rocm
# Check if nvcc exists, otherwise use hipcc
ifeq ($(shell which $(CUDA_HOME)/bin/nvcc 2>/dev/null),)
COMPILER := $(ROCM_HOME)/bin/hipcc
ARCH_FLAG := -D__HIP_PLATFORM_AMD__=1
else
COMPILER := $(CUDA_HOME)/bin/nvcc
ARCH_FLAG := -arch=native
endif
TARGET = gpu_ping_pong
SRC = gpu_ping_pong.cu
all: $(TARGET)
$(TARGET): $(SRC)
$(COMPILER) $(ARCH_FLAG) -o $@ $< -lmscclpp
clean:
rm -f $(TARGET)

View File

@@ -0,0 +1,138 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <iostream>
#include <mscclpp/core.hpp>
#include <mscclpp/gpu_utils.hpp>
#include <mscclpp/memory_channel.hpp>
#include <mscclpp/memory_channel_device.hpp>
#include <sstream>
template <typename... Args>
void log(Args &&...args) {
std::stringstream ss;
(ss << ... << args);
ss << std::endl;
std::cout << ss.str();
}
__device__ void spin_cycles(unsigned long long cycles) {
unsigned long long start = clock64();
while (clock64() - start < cycles) {
// spin
}
}
__global__ void gpuKernel0(mscclpp::BaseMemoryChannelDeviceHandle *devHandle, int iter) {
if (threadIdx.x + blockIdx.x * blockDim.x == 0) {
for (int i = 0; i < iter; ++i) {
devHandle->relaxedWait();
// spin for a few ms
spin_cycles(1e7);
devHandle->relaxedSignal();
}
}
}
__global__ void gpuKernel1(mscclpp::BaseMemoryChannelDeviceHandle *devHandle, int iter) {
if (threadIdx.x + blockIdx.x * blockDim.x == 0) {
for (int i = 0; i < iter; ++i) {
devHandle->relaxedSignal();
devHandle->relaxedWait();
}
}
}
int main() {
// Optional: check if we have at least two GPUs
int deviceCount;
MSCCLPP_CUDATHROW(cudaGetDeviceCount(&deviceCount));
if (deviceCount < 2) {
log("Error: At least two GPUs are required.");
return 1;
}
// Optional: check if the two GPUs can peer-to-peer access each other
int canAccessPeer;
MSCCLPP_CUDATHROW(cudaDeviceCanAccessPeer(&canAccessPeer, 0, 1));
if (!canAccessPeer) {
log("Error: GPU 0 cannot access GPU 1. Make sure that the GPUs are connected peer-to-peer. You can check this "
"by running `nvidia-smi topo -m` (the connection between GPU 0 and 1 should be either NV# or PIX).");
return 1;
}
const int iter = 100;
const mscclpp::Transport transport = mscclpp::Transport::CudaIpc;
log("Creating endpoints ...");
auto ctx = mscclpp::Context::create();
mscclpp::Endpoint ep0 = ctx->createEndpoint({transport, {mscclpp::DeviceType::GPU, 0}});
mscclpp::Endpoint ep1 = ctx->createEndpoint({transport, {mscclpp::DeviceType::GPU, 1}});
log("GPU 0: Creating a connection and a semaphore stub ...");
MSCCLPP_CUDATHROW(cudaSetDevice(0));
std::shared_ptr<mscclpp::Connection> conn0 = ctx->connect(/*localEndpoint*/ ep0, /*remoteEndpoint*/ ep1);
mscclpp::SemaphoreStub semaStub0(conn0);
log("GPU 1: Creating a connection and a semaphore stub ...");
MSCCLPP_CUDATHROW(cudaSetDevice(1));
std::shared_ptr<mscclpp::Connection> conn1 = ctx->connect(/*localEndpoint*/ ep1, /*remoteEndpoint*/ ep0);
mscclpp::SemaphoreStub semaStub1(conn1);
log("GPU 0: Creating a semaphore and a memory channel ...");
MSCCLPP_CUDATHROW(cudaSetDevice(0));
mscclpp::Semaphore sema0(/*localSemaphoreStub*/ semaStub0, /*remoteSemaphoreStub*/ semaStub1);
mscclpp::BaseMemoryChannel memChan0(sema0);
mscclpp::BaseMemoryChannelDeviceHandle memChanHandle0 = memChan0.deviceHandle();
void *devHandle0;
MSCCLPP_CUDATHROW(cudaMalloc(&devHandle0, sizeof(mscclpp::BaseMemoryChannelDeviceHandle)));
MSCCLPP_CUDATHROW(cudaMemcpy(devHandle0, &memChanHandle0, sizeof(memChanHandle0), cudaMemcpyHostToDevice));
log("GPU 1: Creating a semaphore and a memory channel ...");
MSCCLPP_CUDATHROW(cudaSetDevice(1));
mscclpp::Semaphore sema1(/*localSemaphoreStub*/ semaStub1, /*remoteSemaphoreStub*/ semaStub0);
mscclpp::BaseMemoryChannel memChan1(sema1);
mscclpp::BaseMemoryChannelDeviceHandle memChanHandle1 = memChan1.deviceHandle();
void *devHandle1;
MSCCLPP_CUDATHROW(cudaMalloc(&devHandle1, sizeof(mscclpp::BaseMemoryChannelDeviceHandle)));
MSCCLPP_CUDATHROW(cudaMemcpy(devHandle1, &memChanHandle1, sizeof(memChanHandle1), cudaMemcpyHostToDevice));
log("GPU 0: Launching gpuKernel0 ...");
MSCCLPP_CUDATHROW(cudaSetDevice(0));
gpuKernel0<<<1, 1>>>(reinterpret_cast<mscclpp::BaseMemoryChannelDeviceHandle *>(devHandle0), iter);
MSCCLPP_CUDATHROW(cudaGetLastError());
log("GPU 1: Launching gpuKernel1 ...");
MSCCLPP_CUDATHROW(cudaSetDevice(1));
cudaEvent_t start, end;
MSCCLPP_CUDATHROW(cudaEventCreate(&start));
MSCCLPP_CUDATHROW(cudaEventCreate(&end));
MSCCLPP_CUDATHROW(cudaEventRecord(start));
gpuKernel1<<<1, 1>>>(reinterpret_cast<mscclpp::BaseMemoryChannelDeviceHandle *>(devHandle1), iter);
MSCCLPP_CUDATHROW(cudaGetLastError());
MSCCLPP_CUDATHROW(cudaEventRecord(end));
MSCCLPP_CUDATHROW(cudaEventSynchronize(end));
float elapsedMs;
MSCCLPP_CUDATHROW(cudaEventElapsedTime(&elapsedMs, start, end));
MSCCLPP_CUDATHROW(cudaSetDevice(0));
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
float msPerIter = elapsedMs / iter;
log("Elapsed ", msPerIter, " ms per iteration (", iter, ")");
if (msPerIter < 1.0f) {
log("Failed: the elapsed time per iteration is less than 1 ms, which may indicate that the relaxedSignal "
"and relaxedWait are not working as expected.");
return 1;
}
log("Succeed!");
return 0;
}

View File

@@ -19,10 +19,10 @@
namespace mscclpp {
#define MSCCLPP_UNIQUE_ID_BYTES 128
constexpr unsigned int UniqueIdBytes = 128;
/// Unique ID for initializing the TcpBootstrap.
using UniqueId = std::array<uint8_t, MSCCLPP_UNIQUE_ID_BYTES>;
using UniqueId = std::array<uint8_t, UniqueIdBytes>;
/// Return a version string.
/// @return The MSCCL++ version string in "major.minor.patch" format.
@@ -207,7 +207,6 @@ class TcpBootstrap : public Bootstrap {
enum class Transport {
Unknown, // Unknown transport type.
CudaIpc, // CUDA IPC transport type.
Nvls, // NVLS transport type.
IB0, // InfiniBand device 0 transport type.
IB1, // InfiniBand device 1 transport type.
IB2, // InfiniBand device 2 transport type.
@@ -221,7 +220,7 @@ enum class Transport {
};
namespace detail {
const size_t TransportFlagsSize = 12;
const size_t TransportFlagsSize = 11;
static_assert(TransportFlagsSize == static_cast<size_t>(Transport::NumTransports),
"TransportFlagsSize must match the number of transports");
/// Bitset for storing transport flags.
@@ -441,6 +440,14 @@ class Endpoint {
/// @return The device used.
const Device& device() const;
/// Get the host hash.
/// @return The host hash.
uint64_t hostHash() const;
/// Get the process ID hash.
/// @return The process ID hash.
uint64_t pidHash() const;
/// Get the maximum write queue size.
/// @return The maximum number of write requests that can be queued.
int maxWriteQueueSize() const;
@@ -467,9 +474,9 @@ class Endpoint {
class Connection {
public:
/// Constructor.
/// @param context The context associated with the connection.
/// @param localEndpoint The local endpoint of the connection.
Connection(std::shared_ptr<Context> context, const Endpoint& localEndpoint)
: context_(context), localEndpoint_(localEndpoint), maxWriteQueueSize_(localEndpoint.maxWriteQueueSize()) {}
Connection(std::shared_ptr<Context> context, const Endpoint& localEndpoint);
/// Destructor.
virtual ~Connection() = default;
@@ -506,7 +513,7 @@ class Connection {
/// Get the context associated with this connection.
/// @return A shared pointer to the context associated with this connection.
std::shared_ptr<Context> context() const { return context_; }
std::shared_ptr<Context> context() const;
/// Get the device used by the local endpoint.
/// @return The device used by the local endpoint.

View File

@@ -9,6 +9,7 @@
#include <hip/hip_runtime.h>
using cudaError_t = hipError_t;
using cudaEvent_t = hipEvent_t;
using cudaGraph_t = hipGraph_t;
using cudaGraphExec_t = hipGraphExec_t;
using cudaDeviceProp = hipDeviceProp_t;
@@ -24,6 +25,7 @@ using CUmemAllocationProp = hipMemAllocationProp;
using CUmemAccessDesc = hipMemAccessDesc;
using CUmemAllocationHandleType = hipMemAllocationHandleType;
constexpr auto cudaErrorPeerAccessAlreadyEnabled = hipErrorPeerAccessAlreadyEnabled;
constexpr auto cudaSuccess = hipSuccess;
constexpr auto cudaStreamNonBlocking = hipStreamNonBlocking;
constexpr auto cudaStreamCaptureModeGlobal = hipStreamCaptureModeGlobal;
@@ -45,6 +47,12 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri
#define CUDA_SUCCESS hipSuccess
#endif // CUDA_SUCCESS
#define cudaEventCreate(...) hipEventCreate(__VA_ARGS__)
#define cudaEventCreateWithFlags(...) hipEventCreateWithFlags(__VA_ARGS__)
#define cudaEventDestroy(...) hipEventDestroy(__VA_ARGS__)
#define cudaEventRecord(...) hipEventRecord(__VA_ARGS__)
#define cudaEventSynchronize(...) hipEventSynchronize(__VA_ARGS__)
#define cudaEventElapsedTime(...) hipEventElapsedTime(__VA_ARGS__)
#define cudaGetErrorString(...) hipGetErrorString(__VA_ARGS__)
#define cudaGetDevice(...) hipGetDevice(__VA_ARGS__)
#define cudaGetDeviceCount(...) hipGetDeviceCount(__VA_ARGS__)
@@ -53,6 +61,8 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri
#define cudaSetDevice(...) hipSetDevice(__VA_ARGS__)
#define cudaDeviceSynchronize(...) hipDeviceSynchronize(__VA_ARGS__)
#define cudaDeviceGetPCIBusId(...) hipDeviceGetPCIBusId(__VA_ARGS__)
#define cudaDeviceCanAccessPeer(...) hipDeviceCanAccessPeer(__VA_ARGS__)
#define cudaDeviceEnablePeerAccess(...) hipDeviceEnablePeerAccess(__VA_ARGS__)
#define cudaHostAlloc(...) hipHostMalloc(__VA_ARGS__)
#define cudaMalloc(...) hipMalloc(__VA_ARGS__)
#define cudaFree(...) hipFree(__VA_ARGS__)

View File

@@ -18,16 +18,16 @@ using SemaphoreId = uint32_t;
using MemoryId = uint32_t;
using TriggerType = uint64_t;
const TriggerType TriggerData = 0x1; // Trigger a data transfer.
const TriggerType TriggerFlag = 0x2; // Trigger a signaling.
const TriggerType TriggerSync = 0x4; // Trigger a flush.
constexpr TriggerType TriggerData = 0x1; // Trigger a data transfer.
constexpr TriggerType TriggerFlag = 0x2; // Trigger a signaling.
constexpr TriggerType TriggerSync = 0x4; // Trigger a flush.
#define MSCCLPP_BITS_SIZE 32
#define MSCCLPP_BITS_OFFSET 32
#define MSCCLPP_BITS_MEMORY_ID 9
#define MSCCLPP_BITS_TYPE 3
#define MSCCLPP_BITS_SEMAPHORE_ID 10
#define MSCCLPP_BITS_FIFO_RESERVED 1
constexpr unsigned int TriggerBitsSize = 32;
constexpr unsigned int TriggerBitsOffset = 32;
constexpr unsigned int TriggerBitsMemoryId = 9;
constexpr unsigned int TriggerBitsType = 3;
constexpr unsigned int TriggerBitsSemaphoreId = 10;
constexpr unsigned int TriggerBitsFifoReserved = 1;
/// Basic structure of each work element in the FIFO.
union ChannelTrigger {
@@ -35,18 +35,18 @@ union ChannelTrigger {
// The summation of number of bits must be 128 or less.
struct {
// First 64 bits: value[0]
uint64_t size : MSCCLPP_BITS_SIZE;
uint64_t srcOffset : MSCCLPP_BITS_OFFSET;
uint64_t : (64 - MSCCLPP_BITS_SIZE - MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment
uint64_t size : TriggerBitsSize;
uint64_t srcOffset : TriggerBitsOffset;
uint64_t : (64 - TriggerBitsSize - TriggerBitsOffset); // ensure 64-bit alignment
// Second 64 bits: value[1]
uint64_t dstOffset : MSCCLPP_BITS_OFFSET;
uint64_t srcMemoryId : MSCCLPP_BITS_MEMORY_ID;
uint64_t dstMemoryId : MSCCLPP_BITS_MEMORY_ID;
uint64_t type : MSCCLPP_BITS_TYPE;
uint64_t semaphoreId : MSCCLPP_BITS_SEMAPHORE_ID;
uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_MEMORY_ID - MSCCLPP_BITS_MEMORY_ID - MSCCLPP_BITS_TYPE -
MSCCLPP_BITS_SEMAPHORE_ID - MSCCLPP_BITS_FIFO_RESERVED); // ensure 64-bit alignment
uint64_t reserved : MSCCLPP_BITS_FIFO_RESERVED;
uint64_t dstOffset : TriggerBitsOffset;
uint64_t srcMemoryId : TriggerBitsMemoryId;
uint64_t dstMemoryId : TriggerBitsMemoryId;
uint64_t type : TriggerBitsType;
uint64_t semaphoreId : TriggerBitsSemaphoreId;
uint64_t : (64 - TriggerBitsOffset - TriggerBitsMemoryId - TriggerBitsMemoryId - TriggerBitsType -
TriggerBitsSemaphoreId - TriggerBitsFifoReserved); // ensure 64-bit alignment
uint64_t reserved : TriggerBitsFifoReserved;
} fields;
#if defined(MSCCLPP_DEVICE_COMPILE)
@@ -66,28 +66,28 @@ union ChannelTrigger {
/// @param semaphoreId The ID of the semaphore.
MSCCLPP_DEVICE_INLINE ChannelTrigger(TriggerType type, MemoryId dst, uint64_t dstOffset, MemoryId src,
uint64_t srcOffset, uint64_t bytes, int semaphoreId) {
MSCCLPP_ASSERT_DEVICE(type < (1ULL << MSCCLPP_BITS_TYPE), "type is too large");
MSCCLPP_ASSERT_DEVICE(dst < (1ULL << MSCCLPP_BITS_MEMORY_ID), "dst is too large");
MSCCLPP_ASSERT_DEVICE(dstOffset < (1ULL << MSCCLPP_BITS_OFFSET), "dstOffset is too large");
MSCCLPP_ASSERT_DEVICE(src < (1ULL << MSCCLPP_BITS_MEMORY_ID), "src is too large");
MSCCLPP_ASSERT_DEVICE(srcOffset < (1ULL << MSCCLPP_BITS_OFFSET), "srcOffset is too large");
MSCCLPP_ASSERT_DEVICE(type < (1ULL << TriggerBitsType), "type is too large");
MSCCLPP_ASSERT_DEVICE(dst < (1ULL << TriggerBitsMemoryId), "dst is too large");
MSCCLPP_ASSERT_DEVICE(dstOffset < (1ULL << TriggerBitsOffset), "dstOffset is too large");
MSCCLPP_ASSERT_DEVICE(src < (1ULL << TriggerBitsMemoryId), "src is too large");
MSCCLPP_ASSERT_DEVICE(srcOffset < (1ULL << TriggerBitsOffset), "srcOffset is too large");
MSCCLPP_ASSERT_DEVICE(bytes != 0, "bytes must not be zero");
MSCCLPP_ASSERT_DEVICE(bytes < (1ULL << MSCCLPP_BITS_SIZE), "bytes is too large");
MSCCLPP_ASSERT_DEVICE(semaphoreId < (1ULL << MSCCLPP_BITS_SEMAPHORE_ID), "semaphoreId is too large");
constexpr uint64_t maskSize = (1ULL << MSCCLPP_BITS_SIZE) - 1;
constexpr uint64_t maskSrcOffset = (1ULL << MSCCLPP_BITS_OFFSET) - 1;
constexpr uint64_t maskDstOffset = (1ULL << MSCCLPP_BITS_OFFSET) - 1;
constexpr uint64_t maskSrcMemoryId = (1ULL << MSCCLPP_BITS_MEMORY_ID) - 1;
constexpr uint64_t maskDstMemoryId = (1ULL << MSCCLPP_BITS_MEMORY_ID) - 1;
constexpr uint64_t maskType = (1ULL << MSCCLPP_BITS_TYPE) - 1;
constexpr uint64_t maskSemaphoreId = (1ULL << MSCCLPP_BITS_SEMAPHORE_ID) - 1;
value.fst = (((srcOffset & maskSrcOffset) << MSCCLPP_BITS_SIZE) + (bytes & maskSize));
value.snd = (((((((((semaphoreId & maskSemaphoreId) << MSCCLPP_BITS_TYPE) + ((uint64_t)type & maskType))
<< MSCCLPP_BITS_MEMORY_ID) +
MSCCLPP_ASSERT_DEVICE(bytes < (1ULL << TriggerBitsSize), "bytes is too large");
MSCCLPP_ASSERT_DEVICE(semaphoreId < (1ULL << TriggerBitsSemaphoreId), "semaphoreId is too large");
constexpr uint64_t maskSize = (1ULL << TriggerBitsSize) - 1;
constexpr uint64_t maskSrcOffset = (1ULL << TriggerBitsOffset) - 1;
constexpr uint64_t maskDstOffset = (1ULL << TriggerBitsOffset) - 1;
constexpr uint64_t maskSrcMemoryId = (1ULL << TriggerBitsMemoryId) - 1;
constexpr uint64_t maskDstMemoryId = (1ULL << TriggerBitsMemoryId) - 1;
constexpr uint64_t maskType = (1ULL << TriggerBitsType) - 1;
constexpr uint64_t maskSemaphoreId = (1ULL << TriggerBitsSemaphoreId) - 1;
value.fst = (((srcOffset & maskSrcOffset) << TriggerBitsSize) + (bytes & maskSize));
value.snd = (((((((((semaphoreId & maskSemaphoreId) << TriggerBitsType) + ((uint64_t)type & maskType))
<< TriggerBitsMemoryId) +
(dst & maskDstMemoryId))
<< MSCCLPP_BITS_MEMORY_ID) +
<< TriggerBitsMemoryId) +
(src & maskSrcMemoryId))
<< MSCCLPP_BITS_OFFSET) +
<< TriggerBitsOffset) +
(dstOffset & maskDstOffset));
}
#endif // defined(MSCCLPP_DEVICE_COMPILE)

View File

@@ -77,7 +77,6 @@ void register_core(nb::module_& m) {
nb::enum_<Transport>(m, "Transport")
.value("Unknown", Transport::Unknown)
.value("CudaIpc", Transport::CudaIpc)
.value("Nvls", Transport::Nvls)
.value("IB0", Transport::IB0)
.value("IB1", Transport::IB1)
.value("IB2", Transport::IB2)

View File

@@ -33,6 +33,11 @@ std::shared_ptr<RegisteredMemory::Impl> Connection::getImpl(RegisteredMemory& me
std::shared_ptr<Endpoint::Impl> Connection::getImpl(Endpoint& memory) { return memory.pimpl_; }
MSCCLPP_API_CPP Connection::Connection(std::shared_ptr<Context> context, const Endpoint& localEndpoint)
: context_(context), localEndpoint_(localEndpoint), maxWriteQueueSize_(localEndpoint.maxWriteQueueSize()) {}
MSCCLPP_API_CPP std::shared_ptr<Context> Connection::context() const { return context_; }
MSCCLPP_API_CPP const Device& Connection::localDevice() const { return localEndpoint_.device(); }
MSCCLPP_API_CPP int Connection::getMaxWriteQueueSize() const { return maxWriteQueueSize_; }

View File

@@ -77,6 +77,29 @@ MSCCLPP_API_CPP std::shared_ptr<Connection> Context::connect(Endpoint localEndpo
int deviceId;
if (localEndpoint.device().type == DeviceType::GPU) {
deviceId = localEndpoint.device().id;
if (remoteEndpoint.device().type == DeviceType::GPU && localEndpoint.hostHash() == remoteEndpoint.hostHash() &&
localEndpoint.pidHash() == remoteEndpoint.pidHash()) {
// Connecting two GPUs in the same process - need to enable peer access explicitly
if (deviceId < 0) {
throw Error("No GPU device ID provided for local endpoint", ErrorCode::InvalidUsage);
}
int remoteDeviceId = remoteEndpoint.device().id;
if (remoteDeviceId < 0) {
throw Error("No GPU device ID provided for remote endpoint", ErrorCode::InvalidUsage);
}
int originalDeviceId;
MSCCLPP_CUDATHROW(cudaGetDevice(&originalDeviceId));
if (originalDeviceId != deviceId) {
MSCCLPP_CUDATHROW(cudaSetDevice(deviceId));
}
auto ret = cudaDeviceEnablePeerAccess(remoteDeviceId, 0);
if (ret != cudaSuccess && ret != cudaErrorPeerAccessAlreadyEnabled) {
MSCCLPP_CUDATHROW(ret);
}
if (originalDeviceId != deviceId) {
MSCCLPP_CUDATHROW(cudaSetDevice(originalDeviceId));
}
}
} else if (remoteEndpoint.device().type == DeviceType::GPU) {
deviceId = remoteEndpoint.device().id;
} else {

View File

@@ -17,6 +17,7 @@ Endpoint::Impl::Impl(EndpointConfig config, Context::Impl& contextImpl)
: transport_(config.transport),
device_(config.device),
hostHash_(getHostHash()),
pidHash_(getPidHash()),
maxWriteQueueSize_(config.maxWriteQueueSize) {
if (device_.type == DeviceType::GPU && device_.id < 0) {
MSCCLPP_CUDATHROW(cudaGetDevice(&(device_.id)));
@@ -44,6 +45,7 @@ Endpoint::Impl::Impl(const std::vector<char>& serialization) {
it = detail::deserialize(it, transport_);
it = detail::deserialize(it, device_);
it = detail::deserialize(it, hostHash_);
it = detail::deserialize(it, pidHash_);
if (AllIBTransports.has(transport_)) {
ibLocal_ = false;
it = detail::deserialize(it, ibQpInfo_);
@@ -59,6 +61,10 @@ MSCCLPP_API_CPP Transport Endpoint::transport() const { return pimpl_->transport
MSCCLPP_API_CPP const Device& Endpoint::device() const { return pimpl_->device_; }
MSCCLPP_API_CPP uint64_t Endpoint::hostHash() const { return pimpl_->hostHash_; }
MSCCLPP_API_CPP uint64_t Endpoint::pidHash() const { return pimpl_->pidHash_; }
MSCCLPP_API_CPP int Endpoint::maxWriteQueueSize() const { return pimpl_->maxWriteQueueSize_; }
MSCCLPP_API_CPP std::vector<char> Endpoint::serialize() const {
@@ -66,6 +72,7 @@ MSCCLPP_API_CPP std::vector<char> Endpoint::serialize() const {
detail::serialize(data, pimpl_->transport_);
detail::serialize(data, pimpl_->device_);
detail::serialize(data, pimpl_->hostHash_);
detail::serialize(data, pimpl_->pidHash_);
if (AllIBTransports.has(pimpl_->transport_)) {
detail::serialize(data, pimpl_->ibQpInfo_);
}

View File

@@ -21,6 +21,7 @@ struct Endpoint::Impl {
Transport transport_;
Device device_;
uint64_t hostHash_;
uint64_t pidHash_;
int maxWriteQueueSize_;
// The following are only used for IB and are undefined for other transports.

View File

@@ -68,7 +68,7 @@ RegisteredMemory::Impl::Impl(void* data, size_t size, TransportFlags transports,
} else {
transportInfo.rootPid = getpid();
if (transportInfo.rootPid < 0) {
throw mscclpp::SysError("getpid() failed", errno);
throw SysError("getpid() failed", errno);
}
MSCCLPP_CUTHROW(cuMemExportToShareableHandle(&transportInfo.fileDesc, handle, getNvlsMemHandleType(), 0));
this->fileDesc = transportInfo.fileDesc;