This commit is contained in:
Changho Hwang
2023-03-27 14:09:26 +00:00
parent 8e4146aba9
commit 8fc8f5b4fe
4 changed files with 307 additions and 213 deletions

View File

@@ -4,33 +4,37 @@
* See LICENSE.txt for license information
************************************************************************/
#include "core.h"
#include "debug.h"
#include <stdlib.h>
#include "core.h"
#include <stdarg.h>
#include <stdlib.h>
#include <sys/syscall.h>
int mscclppDebugLevel = -1;
static int pid = -1;
static char hostname[1024];
thread_local int mscclppDebugNoWarn = 0;
char mscclppLastError[1024] = ""; // Global string for the last error in human readable form
char mscclppLastError[1024] = ""; // Global string for the last error in human readable form
uint64_t mscclppDebugMask = MSCCLPP_INIT; // Default debug sub-system mask is INIT
FILE *mscclppDebugFile = stdout;
FILE* mscclppDebugFile = stdout;
mscclppLogHandler_t mscclppDebugLogHandler = NULL;
pthread_mutex_t mscclppDebugLock = PTHREAD_MUTEX_INITIALIZER;
std::chrono::steady_clock::time_point mscclppEpoch;
static __thread int tid = -1;
void mscclppDebugDefaultLogHandler(int, unsigned long, const char *msg)
void mscclppDebugDefaultLogHandler(int, unsigned long, const char* msg)
{
fwrite(msg, 1, strlen(msg), mscclppDebugFile);
}
void mscclppDebugInit() {
void mscclppDebugInit()
{
pthread_mutex_lock(&mscclppDebugLock);
if (mscclppDebugLevel != -1) { pthread_mutex_unlock(&mscclppDebugLock); return; }
if (mscclppDebugLevel != -1) {
pthread_mutex_unlock(&mscclppDebugLock);
return;
}
const char* mscclpp_debug = getenv("MSCCLPP_DEBUG");
int tempNcclDebugLevel = -1;
if (mscclpp_debug == NULL) {
@@ -54,10 +58,13 @@ void mscclppDebugInit() {
char* mscclppDebugSubsysEnv = getenv("MSCCLPP_DEBUG_SUBSYS");
if (mscclppDebugSubsysEnv != NULL) {
int invert = 0;
if (mscclppDebugSubsysEnv[0] == '^') { invert = 1; mscclppDebugSubsysEnv++; }
if (mscclppDebugSubsysEnv[0] == '^') {
invert = 1;
mscclppDebugSubsysEnv++;
}
mscclppDebugMask = invert ? ~0ULL : 0ULL;
char *mscclppDebugSubsys = strdup(mscclppDebugSubsysEnv);
char *subsys = strtok(mscclppDebugSubsys, ",");
char* mscclppDebugSubsys = strdup(mscclppDebugSubsysEnv);
char* subsys = strtok(mscclppDebugSubsys, ",");
while (subsys != NULL) {
uint64_t mask = 0;
if (strcasecmp(subsys, "INIT") == 0) {
@@ -84,7 +91,10 @@ void mscclppDebugInit() {
mask = MSCCLPP_ALL;
}
if (mask) {
if (invert) mscclppDebugMask &= ~mask; else mscclppDebugMask |= mask;
if (invert)
mscclppDebugMask &= ~mask;
else
mscclppDebugMask |= mask;
}
subsys = strtok(NULL, ",");
}
@@ -102,32 +112,32 @@ void mscclppDebugInit() {
const char* mscclppDebugFileEnv = getenv("MSCCLPP_DEBUG_FILE");
if (tempNcclDebugLevel > MSCCLPP_LOG_VERSION && mscclppDebugFileEnv != NULL) {
int c = 0;
char debugFn[PATH_MAX+1] = "";
char *dfn = debugFn;
char debugFn[PATH_MAX + 1] = "";
char* dfn = debugFn;
while (mscclppDebugFileEnv[c] != '\0' && c < PATH_MAX) {
if (mscclppDebugFileEnv[c++] != '%') {
*dfn++ = mscclppDebugFileEnv[c-1];
*dfn++ = mscclppDebugFileEnv[c - 1];
continue;
}
switch (mscclppDebugFileEnv[c++]) {
case '%': // Double %
*dfn++ = '%';
break;
case 'h': // %h = hostname
dfn += snprintf(dfn, PATH_MAX, "%s", hostname);
break;
case 'p': // %p = pid
dfn += snprintf(dfn, PATH_MAX, "%d", pid);
break;
default: // Echo everything we don't understand
*dfn++ = '%';
*dfn++ = mscclppDebugFileEnv[c-1];
break;
case '%': // Double %
*dfn++ = '%';
break;
case 'h': // %h = hostname
dfn += snprintf(dfn, PATH_MAX, "%s", hostname);
break;
case 'p': // %p = pid
dfn += snprintf(dfn, PATH_MAX, "%d", pid);
break;
default: // Echo everything we don't understand
*dfn++ = '%';
*dfn++ = mscclppDebugFileEnv[c - 1];
break;
}
}
*dfn = '\0';
if (debugFn[0] != '\0') {
FILE *file = fopen(debugFn, "w");
FILE* file = fopen(debugFn, "w");
if (file != nullptr) {
setbuf(file, nullptr); // disable buffering
mscclppDebugFile = file;
@@ -146,20 +156,27 @@ void mscclppDebugInit() {
* Also exported to the dynamically loadable Net transport modules so
* they can share the debugging mechanisms and output files
*/
void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char *filefunc, int line, const char *fmt, ...) {
if (__atomic_load_n(&mscclppDebugLevel, __ATOMIC_ACQUIRE) == -1) mscclppDebugInit();
if (mscclppDebugNoWarn != 0 && level == MSCCLPP_LOG_WARN) { level = MSCCLPP_LOG_INFO; flags = mscclppDebugNoWarn; }
void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char* filefunc, int line, const char* fmt,
...)
{
if (__atomic_load_n(&mscclppDebugLevel, __ATOMIC_ACQUIRE) == -1)
mscclppDebugInit();
if (mscclppDebugNoWarn != 0 && level == MSCCLPP_LOG_WARN) {
level = MSCCLPP_LOG_INFO;
flags = mscclppDebugNoWarn;
}
// Save the last error (WARN) as a human readable string
if (level == MSCCLPP_LOG_WARN) {
pthread_mutex_lock(&mscclppDebugLock);
va_list vargs;
va_start(vargs, fmt);
(void) vsnprintf(mscclppLastError, sizeof(mscclppLastError), fmt, vargs);
(void)vsnprintf(mscclppLastError, sizeof(mscclppLastError), fmt, vargs);
va_end(vargs);
pthread_mutex_unlock(&mscclppDebugLock);
}
if (mscclppDebugLevel < level || ((flags & mscclppDebugMask) == 0)) return;
if (mscclppDebugLevel < level || ((flags & mscclppDebugMask) == 0))
return;
if (tid == -1) {
tid = syscall(SYS_gettid);
@@ -173,23 +190,23 @@ void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char
char buffer[1024];
size_t len = 0;
if (level == MSCCLPP_LOG_WARN) {
len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] %s:%d MSCCLPP WARN ",
hostname, pid, tid, cudaDev, filefunc, line);
len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] %s:%d MSCCLPP WARN ", hostname, pid, tid, cudaDev, filefunc,
line);
} else if (level == MSCCLPP_LOG_INFO) {
len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] MSCCLPP INFO ", hostname, pid, tid, cudaDev);
} else if (level == MSCCLPP_LOG_TRACE && flags == MSCCLPP_CALL) {
len = snprintf(buffer, sizeof(buffer), "%s:%d:%d MSCCLPP CALL ", hostname, pid, tid);
} else if (level == MSCCLPP_LOG_TRACE) {
auto delta = std::chrono::steady_clock::now() - mscclppEpoch;
double timestamp = std::chrono::duration_cast<std::chrono::duration<double>>(delta).count()*1000;
len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] %f %s:%d MSCCLPP TRACE ",
hostname, pid, tid, cudaDev, timestamp, filefunc, line);
double timestamp = std::chrono::duration_cast<std::chrono::duration<double>>(delta).count() * 1000;
len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] %f %s:%d MSCCLPP TRACE ", hostname, pid, tid, cudaDev,
timestamp, filefunc, line);
}
if (len) {
va_list vargs;
va_start(vargs, fmt);
len += vsnprintf(buffer+len, sizeof(buffer)-len, fmt, vargs);
len += vsnprintf(buffer + len, sizeof(buffer) - len, fmt, vargs);
va_end(vargs);
buffer[len++] = '\n';
mscclppDebugLogHandler(level, flags, buffer);
@@ -198,8 +215,10 @@ void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char
mscclppResult_t mscclppDebugSetLogHandler(mscclppLogHandler_t handler)
{
if (__atomic_load_n(&mscclppDebugLevel, __ATOMIC_ACQUIRE) == -1) mscclppDebugInit();
if (handler == NULL) return mscclppInvalidArgument;
if (__atomic_load_n(&mscclppDebugLevel, __ATOMIC_ACQUIRE) == -1)
mscclppDebugInit();
if (handler == NULL)
return mscclppInvalidArgument;
pthread_mutex_lock(&mscclppDebugLock);
mscclppDebugLogHandler = handler;
pthread_mutex_unlock(&mscclppDebugLock);
@@ -208,11 +227,13 @@ mscclppResult_t mscclppDebugSetLogHandler(mscclppLogHandler_t handler)
MSCCLPP_PARAM(SetThreadName, "SET_THREAD_NAME", 0);
void mscclppSetThreadName(pthread_t thread, const char *fmt, ...) {
void mscclppSetThreadName(pthread_t thread, const char* fmt, ...)
{
// pthread_setname_np is nonstandard GNU extension
// needs the following feature test macro
#ifdef _GNU_SOURCE
if (mscclppParamSetThreadName() != 1) return;
if (mscclppParamSetThreadName() != 1)
return;
char threadName[MSCCLPP_THREAD_NAMELEN];
va_list vargs;
va_start(vargs, fmt);

View File

@@ -8,28 +8,50 @@
#define MSCCLPP_DEBUG_H_
#include "mscclpp.h"
#include <stdio.h>
#include <chrono>
#include <stdio.h>
#include <type_traits>
#include <limits.h>
#include <string.h>
#include <pthread.h>
#include <string.h>
// Conform to pthread and NVTX standard
#define MSCCLPP_THREAD_NAMELEN 16
typedef enum {MSCCLPP_LOG_NONE=0, MSCCLPP_LOG_VERSION=1, MSCCLPP_LOG_WARN=2, MSCCLPP_LOG_INFO=3, MSCCLPP_LOG_ABORT=4, MSCCLPP_LOG_TRACE=5} mscclppDebugLogLevel;
typedef enum {MSCCLPP_INIT=1, MSCCLPP_COLL=2, MSCCLPP_P2P=4, MSCCLPP_SHM=8, MSCCLPP_NET=16, MSCCLPP_GRAPH=32, MSCCLPP_TUNING=64, MSCCLPP_ENV=128, MSCCLPP_ALLOC=256, MSCCLPP_CALL=512, MSCCLPP_ALL=~0} mscclppDebugLogSubSys;
typedef enum
{
MSCCLPP_LOG_NONE = 0,
MSCCLPP_LOG_VERSION = 1,
MSCCLPP_LOG_WARN = 2,
MSCCLPP_LOG_INFO = 3,
MSCCLPP_LOG_ABORT = 4,
MSCCLPP_LOG_TRACE = 5
} mscclppDebugLogLevel;
typedef enum
{
MSCCLPP_INIT = 1,
MSCCLPP_COLL = 2,
MSCCLPP_P2P = 4,
MSCCLPP_SHM = 8,
MSCCLPP_NET = 16,
MSCCLPP_GRAPH = 32,
MSCCLPP_TUNING = 64,
MSCCLPP_ENV = 128,
MSCCLPP_ALLOC = 256,
MSCCLPP_CALL = 512,
MSCCLPP_ALL = ~0
} mscclppDebugLogSubSys;
extern int mscclppDebugLevel;
extern uint64_t mscclppDebugMask;
extern pthread_mutex_t mscclppDebugLock;
extern FILE *mscclppDebugFile;
extern FILE* mscclppDebugFile;
extern mscclppResult_t getHostName(char* hostname, int maxlen, const char delim);
void mscclppDebugDefaultLogHandler(int, unsigned long, const char *msg);
void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char *filefunc, int line, const char *fmt, ...) __attribute__ ((format (printf, 5, 6)));
void mscclppDebugDefaultLogHandler(int, unsigned long, const char* msg);
void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char* filefunc, int line, const char* fmt,
...) __attribute__((format(printf, 5, 6)));
mscclppResult_t mscclppDebugSetLogHandler(mscclppLogHandler_t handler);
// Let code temporarily downgrade WARN into INFO
@@ -47,6 +69,6 @@ extern std::chrono::steady_clock::time_point mscclppEpoch;
#define TRACE(...)
#endif
void mscclppSetThreadName(pthread_t thread, const char *fmt, ...);
void mscclppSetThreadName(pthread_t thread, const char* fmt, ...);
#endif

View File

@@ -17,47 +17,47 @@ extern "C" {
/***************************************************************************************************************
* A mscclppDevConn provides a zero-copy connection between two GPUs connected via P2P NVLink or InfiniBand.
* The communication API is one-sided meaning that for every single data transfer, only one side
* needs to execute unlike a two-sided communication stack such as NCCL where both sides
* needs to execute unlike a two-sided communication stack such as NCCL where both sides
* need to execute a send and a receive instruction, respectively, for every transfer.
*
* A connection is uniquely identified by the (remoteRank, tag) pair at an endpoint.
* The two endpoints register buffers of the same size with the connection.
*
*
* A connection is uniquely identified by the (remoteRank, tag) pair at an endpoint.
* The two endpoints register buffers of the same size with the connection.
*
* The endpoints provide the remoteRank, tag, and the buffer when registering a connection with msccppConnect().
*
* mscllppConnectionSetup() sets up all the registered connections.
*
*
* mscllppConnectionSetup() sets up all the registered connections.
*
***************************************************************************************************************
* A proxy thread running on the CPU is necessary to perform transfers using InfiniBand or the DMA engine.
* A proxy thread running on the CPU is necessary to perform transfers using InfiniBand or the DMA engine.
* The current implementation uses a single proxy thread per context - one IB connection or DMA engine per node.
* Thus multiple threadblocks using different connections might use the same CPU proxy thread.
*
* Thus multiple threadblocks using different connections might use the same CPU proxy thread.
*
* Before using any of functionality of connections, mscclppProxyLaunch needs to be called to spawn the
* proxy threads. There are currently two types of connections:
*
*
* P2P via NVLink: the DMA engine can perform the copy between the buffers. DMA engine has higher latency
* but has a higher bandwidth and costs no compute cycles on the GPU.
*
*
* InfiniBand: the RDMA engine copies the data over MLX devices.
*
*
***************************************************************************************************************
* At the runtime, a GPU kernel has access to a mscclppDevConn object that provides the following functions:
*
*
* put(): the sender initiates a data transfer to the receiver.
*
*
* signal(): the sender signals the receiver that data is ready to be consumed.
*
*
* wait(): the reciever waits on the signal() to start reading the data.
*
*
* The sender should not reuse the buffer till the signal returns.
* The receiver should only access the data after the wait returns.
*
*
* putWithSignal(): the sender initiates a data transfer and signals the receiver that data is ready to be consumed.
* This is an optimized version of a put followed by a signal.
*
* These functions hide the complexity of syncrhonization between the two GPUs and the CPU proxy thread.
*
* These functions hide the complexity of syncrhonization between the two GPUs and the CPU proxy thread.
* Example:
*
*
* // sender GPU
* devConn.put(data1)
* // not OK to write to data1
@@ -67,43 +67,54 @@ extern "C" {
* // not OK to write to data1, data2, data3 // not OK to read data1, data2, data3
* devConn.signal() -------------------------------> devConn.wait()
* // OK to write to data1, data2, data3 // OK to read data1, data2, data3
*
*
*
*
* The two endpoint can concurrently use the same connection provided they are writing (puts) on different
* indices in the registered buffer.
* indices in the registered buffer.
**************************************************************************************************************/
struct mscclppDevConn {
struct mscclppDevConn
{
#ifdef __CUDACC__
__forceinline__ __device__ void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize){
__forceinline__ __device__ void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize)
{
fifo.push(mscclppData, dstDataOffset, srcDataOffset, dataSize);
}
__forceinline__ __device__ void put(uint64_t dataOffset, uint64_t dataSize){
__forceinline__ __device__ void put(uint64_t dataOffset, uint64_t dataSize)
{
put(dataOffset, dataOffset, dataSize);
}
__forceinline__ __device__ void signal(){
__forceinline__ __device__ void signal()
{
epochIncrement();
uint64_t curFifoHead = fifo.push(mscclppFlag | mscclppSync, 0, 0, 1);
while (*(volatile uint64_t *)fifo.triggerFifoTail <= curFifoHead);
while (*(volatile uint64_t*)fifo.triggerFifoTail <= curFifoHead)
;
}
__forceinline__ __device__ void putWithSignal(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize){
__forceinline__ __device__ void putWithSignal(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize)
{
epochIncrement();
uint64_t curFifoHead = fifo.push(mscclppData | mscclppFlag | mscclppSync, dstDataOffset, srcDataOffset, dataSize);
while (*(volatile uint64_t *)fifo.triggerFifoTail <= curFifoHead);
while (*(volatile uint64_t*)fifo.triggerFifoTail <= curFifoHead)
;
}
__forceinline__ __device__ void putWithSignal(uint64_t dataOffset, uint64_t dataSize){
__forceinline__ __device__ void putWithSignal(uint64_t dataOffset, uint64_t dataSize)
{
putWithSignal(dataOffset, dataOffset, dataSize);
}
__forceinline__ __device__ void wait(){
__forceinline__ __device__ void wait()
{
(*recvEpochId) += 1;
while (*(volatile uint64_t*)proxyEpochId < (*recvEpochId));
while (*(volatile uint64_t*)proxyEpochId < (*recvEpochId))
;
}
__forceinline__ __device__ void epochIncrement(){
__forceinline__ __device__ void epochIncrement()
{
*(volatile uint64_t*)sendEpochId += 1;
}
@@ -127,18 +138,24 @@ typedef struct mscclppComm* mscclppComm_t;
typedef struct mscclppDevConn mscclppDevConn_t;
#define MSCCLPP_UNIQUE_ID_BYTES 128
typedef struct { char internal[MSCCLPP_UNIQUE_ID_BYTES]; } mscclppUniqueId;
typedef struct
{
char internal[MSCCLPP_UNIQUE_ID_BYTES];
} mscclppUniqueId;
/* Error type */
typedef enum { mscclppSuccess = 0,
mscclppUnhandledCudaError = 1,
mscclppSystemError = 2,
mscclppInternalError = 3,
mscclppInvalidArgument = 4,
mscclppInvalidUsage = 5,
mscclppRemoteError = 6,
mscclppInProgress = 7,
mscclppNumResults = 8 } mscclppResult_t;
typedef enum
{
mscclppSuccess = 0,
mscclppUnhandledCudaError = 1,
mscclppSystemError = 2,
mscclppInternalError = 3,
mscclppInvalidArgument = 4,
mscclppInvalidUsage = 5,
mscclppRemoteError = 6,
mscclppInProgress = 7,
mscclppNumResults = 8
} mscclppResult_t;
/* Create a unique ID for communication. Only needs to be called by one process.
* Use with mscclppCommInitRankFromId().
@@ -150,16 +167,18 @@ typedef enum { mscclppSuccess = 0,
mscclppResult_t mscclppGetUniqueId(mscclppUniqueId* uniqueId);
/* Transport Types */
typedef enum { mscclppTransportP2P = 0,
mscclppTransportSHM = 1, // TODO(chhwang): not implemented yet
mscclppTransportIB = 2,
typedef enum
{
mscclppTransportP2P = 0,
mscclppTransportSHM = 1, // TODO(chhwang): not implemented yet
mscclppTransportIB = 2,
} mscclppTransport_t;
/* Initialize a communicator. nranks processes with rank 0 to nranks-1 need to call this function.
*
*
* Outputs:
* comm: the communicator to be initialized
*
*
* Inputs:
* nranks: number of ranks in the communicator
* ipPortPair: a string of the form "ip:port" that represents the address of the root process
@@ -169,10 +188,10 @@ mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char*
/* Initialize a communicator from a given mscclppUniqueId. Same as mscclppCommInitRank() except that
* id is provided by the user by calling mscclppGetUniqueId()
*
*
* Outputs:
* comm: the communicator to be initialized
*
*
* Inputs:
* nranks: number of ranks in the communicator
* id: the unique ID to be used for communication
@@ -181,10 +200,10 @@ mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char*
mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank);
/* Ring-based AllGather through the bootstrap socket.
*
*
* Outputs:
* comm: the communicator
*
*
* Inputs:
* data: data array to be gathered where `[r*size, (r+1)*size)` is the data for rank `r`
* size: data size per rank
@@ -192,26 +211,26 @@ mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, msccl
mscclppResult_t mscclppBootstrapAllGather(mscclppComm_t comm, void* data, int size);
/* Destroy a communicator.
*
*
* Inputs:
* comm: the communicator to be destroyed
*/
mscclppResult_t mscclppCommDestroy(mscclppComm_t comm);
/* Return the string for the given error code.
*
*
* Ouput:
* returns the string
*
*
* Inputs:
* result: the error code that this function needs to translate
*/
const char* mscclppGetErrorString(mscclppResult_t result);
const char* mscclppGetErrorString(mscclppResult_t result);
/* Connect to a remote rank. This function only prepares metadata for connection. The actual connection
* is made by a following call of mscclppConnectionSetup(). Note that this function is two-way and a connection
* from rank i to remote rank j needs to have a counterpart from rank j to rank i.
*
*
* Inputs:
* comm: the communicator
* remoteRank: the rank of the remote process
@@ -223,11 +242,11 @@ const char* mscclppGetErrorString(mscclppResult_t result);
* ibDev: the name of the IB device to be used. Expects a null for mscclppTransportP2P.
*/
mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff, uint64_t buffSize,
mscclppTransport_t transportType, const char *ibDev=0);
mscclppTransport_t transportType, const char* ibDev = 0);
/* Establish all connections declared by mscclppConnect(). This function must be called after all mscclppConnect()
* calls are made. This function ensures that all remote ranks are ready to communicate when it returns.
*
*
* Inputs:
* comm: the communicator
*/
@@ -235,22 +254,22 @@ mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm);
/* Return an array of mscclppDevConn_t and the number of connections created by mscclppConnectionSetup().
* The order of connections matches the order of mscclppConnect() calls.
*
*
* Outputs:
* devConns: the array of mscclppDevConn_t. Each mscclppDevConn_t corresponds to a mscclppConnect() call in the
* order of the calls.
* nConns: the number of connections
*
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppGetAllDeviceConnections(mscclppComm_t comm, mscclppDevConn_t** devConns, int* nConns);
/* Return the mscclppDevConn_t corresponding to a given tag and a remoteRank.
*
*
* Outputs:
* devConn: the mscclppDevConn_t corresponding to the given tag
*
*
* Inputs:
* comm: the communicator
* tag: the tag of the connection
@@ -258,37 +277,37 @@ mscclppResult_t mscclppGetAllDeviceConnections(mscclppComm_t comm, mscclppDevCon
*/
mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag, mscclppDevConn_t** devConn);
/* Launch proxy threads for all connections created by mscclppConnectionSetup(). This function is supposed to be called
* before starting a kernel that uses mscclppDevConn_t. Up to two proxy threads are launched for each (GPU + IB) pair
* (one for P2P NVLink and one for InfiniBand).
*
/* Launch proxy threads for all connections created by mscclppConnectionSetup(). This function is supposed to be
* called before starting a kernel that uses mscclppDevConn_t. Up to two proxy threads are launched for each (GPU +
* IB) pair (one for P2P NVLink and one for InfiniBand).
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm);
/* Stop all proxy threads.
*
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppProxyStop(mscclppComm_t comm);
/* Return the rank of the calling process.
*
*
* Outputs:
* rank: the rank of the calling process
*
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppCommRank(mscclppComm_t comm, int* rank);
/* Return the number of ranks of the communicator.
*
*
* Outputs:
* size: the number of ranks of the communicator
*
*
* Inputs:
* comm: the communicator
*/
@@ -298,16 +317,16 @@ mscclppResult_t mscclppCommSize(mscclppComm_t comm, int* size);
typedef void (*mscclppLogHandler_t)(int level, unsigned long flags, const char* msg);
/* The default log handler.
*
*
* Inputs:
* level(unused): the log level
* flags(unused): the log flags
* msg: the log message
*/
void mscclppDefaultLogHandler(int level, unsigned long flags, const char *msg);
void mscclppDefaultLogHandler(int level, unsigned long flags, const char* msg);
/* Set a custom log handler.
*
*
* Inputs:
* handler: the log handler function
*/

View File

@@ -1,17 +1,18 @@
#include "mscclpp.h"
#include "bootstrap.h"
#include "core.h"
#include "gdr.h"
#include "mscclpp.h"
#include <map>
#include <sstream>
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
static uint64_t hashUniqueId(mscclppUniqueId const &id) {
char const *bytes = (char const*)&id;
static uint64_t hashUniqueId(mscclppUniqueId const& id)
{
char const* bytes = (char const*)&id;
uint64_t h = 0xdeadbeef;
for(int i=0; i < (int)sizeof(mscclppUniqueId); i++) {
for (int i = 0; i < (int)sizeof(mscclppUniqueId); i++) {
h ^= h >> 32;
h *= 0x8db3db47fa2994ad;
h += bytes[i];
@@ -25,7 +26,8 @@ static bool initialized = false;
gdr_t mscclppGdrCopy = NULL;
mscclppResult_t initGdrCopy() {
mscclppResult_t initGdrCopy()
{
mscclppGdrCopy = mscclppGdrInit();
if (mscclppGdrCopy == NULL) {
WARN("GDR init failed");
@@ -34,8 +36,10 @@ mscclppResult_t initGdrCopy() {
return mscclppSuccess;
}
static mscclppResult_t mscclppInit() {
if (__atomic_load_n(&initialized, __ATOMIC_ACQUIRE)) return mscclppSuccess;
static mscclppResult_t mscclppInit()
{
if (__atomic_load_n(&initialized, __ATOMIC_ACQUIRE))
return mscclppSuccess;
pthread_mutex_lock(&initLock);
if (!initialized) {
// initEnv();
@@ -62,22 +66,25 @@ static std::string mscclppShmFileName(mscclppComm_t comm, int rank)
}
MSCCLPP_API(mscclppResult_t, mscclppGetUniqueId, mscclppUniqueId* out);
mscclppResult_t mscclppGetUniqueId(mscclppUniqueId* out) {
mscclppResult_t mscclppGetUniqueId(mscclppUniqueId* out)
{
MSCCLPPCHECK(mscclppInit());
// mscclppCHECK(PtrCheck(out, "GetUniqueId", "out"));
// mscclppCHECK(PtrCheck(out, "GetUniqueId", "out"));
mscclppResult_t res = bootstrapGetUniqueId((struct mscclppBootstrapHandle*)out);
TRACE_CALL("mscclppGetUniqueId(0x%llx)", (unsigned long long)hashUniqueId(*out));
return res;
}
MSCCLPP_API(mscclppResult_t, mscclppBootstrapAllGather, mscclppComm_t comm, void* data, int size);
mscclppResult_t mscclppBootstrapAllGather(mscclppComm_t comm, void* data, int size){
mscclppResult_t mscclppBootstrapAllGather(mscclppComm_t comm, void* data, int size)
{
MSCCLPPCHECK(bootstrapAllGather(comm->bootstrap, data, size));
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppCommInitRank, mscclppComm_t* comm, int nranks, const char* ipPortPair, int rank);
mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char* ipPortPair, int rank) {
mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char* ipPortPair, int rank)
{
if (mscclppGdrCopy == NULL) {
MSCCLPPCHECK(initGdrCopy());
}
@@ -99,7 +106,7 @@ mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char*
MSCCLPPCHECK(bootstrapGetUniqueId(&handle, rank == 0, ipPortPair));
_comm->magic = handle.magic;
MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t **)&_comm->abortFlag, 1), res, fail);
MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t**)&_comm->abortFlag, 1), res, fail);
MSCCLPPCHECK(bootstrapInit(&handle, _comm));
#if defined(ENABLE_NPKIT)
@@ -142,15 +149,18 @@ mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char*
return res;
fail:
if (_comm) {
if (_comm->abortFlag) mscclppCudaHostFree((void *)_comm->abortFlag);
if (_comm->abortFlag)
mscclppCudaHostFree((void*)_comm->abortFlag);
free(_comm);
}
if (comm) *comm = NULL;
if (comm)
*comm = NULL;
return res;
}
MSCCLPP_API(mscclppResult_t, mscclppCommInitRankFromId, mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank);
mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank) {
mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank)
{
if (mscclppGdrCopy == NULL) {
MSCCLPPCHECK(initGdrCopy());
}
@@ -168,7 +178,7 @@ mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, msccl
MSCCLPPCHECK(bootstrapNetInit());
_comm->magic = handle->magic;
MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t **)&_comm->abortFlag, 1), res, fail);
MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t**)&_comm->abortFlag, 1), res, fail);
MSCCLPPCHECK(bootstrapInit(handle, _comm));
#if defined(ENABLE_NPKIT)
@@ -180,15 +190,18 @@ mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, msccl
return res;
fail:
if (_comm) {
if (_comm->abortFlag) mscclppCudaHostFree((void *)_comm->abortFlag);
if (_comm->abortFlag)
mscclppCudaHostFree((void*)_comm->abortFlag);
free(_comm);
}
if (comm) *comm = NULL;
if (comm)
*comm = NULL;
return res;
}
MSCCLPP_API(mscclppResult_t, mscclppCommDestroy, mscclppComm_t comm);
mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){
mscclppResult_t mscclppCommDestroy(mscclppComm_t comm)
{
#if defined(ENABLE_NPKIT)
const char* npkitDumpDir = nullptr;
#endif
@@ -197,7 +210,7 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){
return mscclppSuccess;
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
struct mscclppConn* conn = &comm->conns[i];
if (conn->cpuProxyFlagGdrDesc) {
// IB
MSCCLPPCHECK(mscclppGdrCudaFree(conn->cpuProxyFlagGdrDesc));
@@ -208,7 +221,7 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){
}
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState *proxyState = comm->proxyState[i];
struct mscclppProxyState* proxyState = comm->proxyState[i];
if (proxyState) {
MSCCLPPCHECK(mscclppGdrCudaFree(proxyState->triggerFifo.desc));
MSCCLPPCHECK(mscclppGdrCudaFree(proxyState->fifoHead.desc));
@@ -227,9 +240,9 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){
}
}
for (int i = 0; i < comm->nConns; i++){
struct mscclppConn *conn = &comm->conns[i];
if (conn){
for (int i = 0; i < comm->nConns; i++) {
struct mscclppConn* conn = &comm->conns[i];
if (conn) {
MSCCLPPCHECK(mscclppCudaFree(conn->devConn->sendEpochId));
MSCCLPPCHECK(mscclppCudaFree(conn->devConn->recvEpochId));
}
@@ -238,7 +251,7 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){
if (comm->bootstrap)
MSCCLPPCHECK(bootstrapClose(comm->bootstrap));
mscclppCudaHostFree((void *)comm->abortFlag);
mscclppCudaHostFree((void*)comm->abortFlag);
free(comm);
#if defined(ENABLE_NPKIT)
@@ -256,24 +269,36 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){
}
MSCCLPP_API(const char*, mscclppGetErrorString, mscclppResult_t code);
const char* mscclppGetErrorString(mscclppResult_t code) {
const char* mscclppGetErrorString(mscclppResult_t code)
{
switch (code) {
case mscclppSuccess : return "no error";
case mscclppUnhandledCudaError : return "unhandled cuda error";
case mscclppSystemError : return "unhandled system error";
case mscclppInternalError : return "internal error";
case mscclppInvalidArgument : return "invalid argument";
case mscclppInvalidUsage : return "invalid usage";
case mscclppRemoteError : return "remote process exited or there was a network error";
case mscclppInProgress : return "MSCCL++ operation in progress";
default : return "unknown result code";
case mscclppSuccess:
return "no error";
case mscclppUnhandledCudaError:
return "unhandled cuda error";
case mscclppSystemError:
return "unhandled system error";
case mscclppInternalError:
return "internal error";
case mscclppInvalidArgument:
return "invalid argument";
case mscclppInvalidUsage:
return "invalid usage";
case mscclppRemoteError:
return "remote process exited or there was a network error";
case mscclppInProgress:
return "MSCCL++ operation in progress";
default:
return "unknown result code";
}
}
MSCCLPP_API(mscclppResult_t, mscclppGetDeviceConnection, mscclppComm_t comm, int remoteRank, int tag, mscclppDevConn_t** devConn);
mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag, mscclppDevConn_t** devConn){
for (int i = 0; i < comm->nConns; i++){
if (comm->devConns[i].remoteRank == remoteRank && comm->devConns[i].tag == tag){
MSCCLPP_API(mscclppResult_t, mscclppGetDeviceConnection, mscclppComm_t comm, int remoteRank, int tag,
mscclppDevConn_t** devConn);
mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag, mscclppDevConn_t** devConn)
{
for (int i = 0; i < comm->nConns; i++) {
if (comm->devConns[i].remoteRank == remoteRank && comm->devConns[i].tag == tag) {
*devConn = &comm->devConns[i];
return mscclppSuccess;
}
@@ -282,8 +307,8 @@ mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, i
return mscclppInvalidArgument;
}
MSCCLPP_API(mscclppResult_t, mscclppGetAllDeviceConnections, mscclppComm_t comm, mscclppDevConn_t** devConns, int* nConns);
MSCCLPP_API(mscclppResult_t, mscclppGetAllDeviceConnections, mscclppComm_t comm, mscclppDevConn_t** devConns,
int* nConns);
mscclppResult_t mscclppGetAllDeviceConnections(mscclppComm_t comm, mscclppDevConn_t** devConns, int* nConns)
{
*nConns = comm->nConns;
@@ -291,17 +316,16 @@ mscclppResult_t mscclppGetAllDeviceConnections(mscclppComm_t comm, mscclppDevCon
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppConnect, mscclppComm_t comm, int remoteRank, int tag,
void* localBuff, uint64_t buffSize, mscclppTransport_t transportType, const char *ibDev);
MSCCLPP_API(mscclppResult_t, mscclppConnect, mscclppComm_t comm, int remoteRank, int tag, void* localBuff,
uint64_t buffSize, mscclppTransport_t transportType, const char* ibDev);
mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff, uint64_t buffSize,
mscclppTransport_t transportType, const char *ibDev)
mscclppTransport_t transportType, const char* ibDev)
{
if (comm->nConns == MAXCONNECTIONS) {
WARN("Too many connections made");
return mscclppInternalError;
}
struct mscclppConn *conn = &comm->conns[comm->nConns];
struct mscclppConn* conn = &comm->conns[comm->nConns];
conn->transport = transportType;
conn->buffSize = buffSize;
@@ -333,12 +357,12 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void
}
// Set the ib context for this conn
conn->ibCtx = comm->ibContext[ibDevIdx];
} else if (transportType == mscclppTransportP2P){
} else if (transportType == mscclppTransportP2P) {
// Check if a DMA context/stream exists
if (comm->stream == NULL){
if (comm->stream == NULL) {
CUDACHECK(cudaStreamCreateWithFlags(&comm->stream, cudaStreamNonBlocking));
}
} else if (transportType == mscclppTransportSHM){
} else if (transportType == mscclppTransportSHM) {
WARN("Shared memory interconnection is not implemented yet!");
return mscclppInternalError;
} else {
@@ -346,44 +370,44 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void
return mscclppInvalidUsage;
}
// Find/create a proxy state for the given connection
struct mscclppProxyState *proxyState = NULL;
struct mscclppProxyState* proxyState = NULL;
// First see if there is a matching context
// If not, find the first empty proxy
int firstEmptyProxyIndex = -1;
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState *curProxy = comm->proxyState[i];
if (curProxy && (curProxy->transportType == transportType)){
if ((transportType == mscclppTransportIB && curProxy->ibContext == conn->ibCtx) || (transportType == mscclppTransportP2P)){
struct mscclppProxyState* curProxy = comm->proxyState[i];
if (curProxy && (curProxy->transportType == transportType)) {
if ((transportType == mscclppTransportIB && curProxy->ibContext == conn->ibCtx) ||
(transportType == mscclppTransportP2P)) {
proxyState = curProxy;
break; // we found the matching context
}
}
if (curProxy == NULL && firstEmptyProxyIndex == -1){
if (curProxy == NULL && firstEmptyProxyIndex == -1) {
firstEmptyProxyIndex = i;
}
}
if (proxyState == NULL && firstEmptyProxyIndex == -1){
if (proxyState == NULL && firstEmptyProxyIndex == -1) {
WARN("Too many proxies have been allocated!");
return mscclppInvalidUsage;
}
// If we couldn't find a matching context, create one
if (proxyState == NULL){
if (proxyState == NULL) {
MSCCLPPCHECK(mscclppCalloc(&proxyState, 1));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->triggerFifo.hostPtr, &proxyState->triggerFifo.devPtr,
MSCCLPP_PROXY_FIFO_SIZE, &proxyState->triggerFifo.desc));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->fifoHead.hostPtr, &proxyState->fifoHead.devPtr,
1, &proxyState->fifoHead.desc));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->fifoTail.hostPtr, &proxyState->fifoTail.devPtr,
1, &proxyState->fifoTail.desc));
MSCCLPPCHECK(
mscclppGdrCudaCalloc(&proxyState->fifoHead.hostPtr, &proxyState->fifoHead.devPtr, 1, &proxyState->fifoHead.desc));
MSCCLPPCHECK(
mscclppGdrCudaCalloc(&proxyState->fifoTail.hostPtr, &proxyState->fifoTail.devPtr, 1, &proxyState->fifoTail.desc));
if (transportType == mscclppTransportIB){
if (transportType == mscclppTransportIB) {
proxyState->ibContext = conn->ibCtx;
proxyState->stream = NULL;
} else if (transportType == mscclppTransportP2P){
} else if (transportType == mscclppTransportP2P) {
proxyState->ibContext = NULL;
proxyState->stream = comm->stream;
}
@@ -395,8 +419,8 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void
WARN("Proxy allocation failed!");
return mscclppInternalError;
}
struct mscclppDevConn *devConn = &comm->devConns[comm->nConns];
struct mscclppDevConn* devConn = &comm->devConns[comm->nConns];
conn->devConn = devConn;
conn->devConn->localBuff = localBuff;
@@ -415,7 +439,8 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void
return mscclppSuccess;
}
struct connInfo {
struct connInfo
{
cudaIpcMemHandle_t handleBuff;
cudaIpcMemHandle_t handleFlag;
cudaIpcMemHandle_t handleProxyFlag;
@@ -425,12 +450,13 @@ struct connInfo {
mscclppIbMrInfo infoProxyFlagMr;
};
mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/){
if (connInfo == NULL || conn == NULL){
mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/)
{
if (connInfo == NULL || conn == NULL) {
WARN("connInfo or connection cannot be null");
return mscclppInternalError;
}
struct mscclppDevConn *devConn = conn->devConn;
struct mscclppDevConn* devConn = conn->devConn;
MSCCLPPCHECK(mscclppCudaCalloc(&devConn->proxyEpochId, 1));
CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleProxyFlag, devConn->proxyEpochId));
CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleBuff, devConn->localBuff));
@@ -438,28 +464,33 @@ mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*outpu
return mscclppSuccess;
}
mscclppResult_t mscclppP2pConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/){
if (connInfo == NULL || conn == NULL){
mscclppResult_t mscclppP2pConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/)
{
if (connInfo == NULL || conn == NULL) {
WARN("ipcHandles or connection cannot be null");
return mscclppInternalError;
}
CUDACHECK(cudaIpcOpenMemHandle((void**)&conn->devConn->remoteBuff, connInfo->handleBuff, cudaIpcMemLazyEnablePeerAccess));
CUDACHECK(cudaIpcOpenMemHandle((void**)&conn->devConn->remoteFlag, connInfo->handleFlag, cudaIpcMemLazyEnablePeerAccess));
CUDACHECK(cudaIpcOpenMemHandle((void**)&conn->remoteProxyFlag, connInfo->handleProxyFlag, cudaIpcMemLazyEnablePeerAccess));
CUDACHECK(
cudaIpcOpenMemHandle((void**)&conn->devConn->remoteBuff, connInfo->handleBuff, cudaIpcMemLazyEnablePeerAccess));
CUDACHECK(
cudaIpcOpenMemHandle((void**)&conn->devConn->remoteFlag, connInfo->handleFlag, cudaIpcMemLazyEnablePeerAccess));
CUDACHECK(
cudaIpcOpenMemHandle((void**)&conn->remoteProxyFlag, connInfo->handleProxyFlag, cudaIpcMemLazyEnablePeerAccess));
return mscclppSuccess;
}
mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/){
if (connInfo == NULL || conn == NULL){
mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/)
{
if (connInfo == NULL || conn == NULL) {
WARN("connInfo or connection cannot be null");
return mscclppInternalError;
}
struct mscclppDevConn *devConn = conn->devConn;
struct mscclppDevConn* devConn = conn->devConn;
devConn->remoteBuff = NULL;
devConn->remoteFlag = NULL;
MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuProxyFlag, &devConn->proxyEpochId, 1, &conn->cpuProxyFlagGdrDesc));
struct mscclppIbContext *ibCtx = conn->ibCtx;
struct mscclppIbContext* ibCtx = conn->ibCtx;
if (conn->ibQp == NULL) {
MSCCLPPCHECK(mscclppIbContextCreateQp(ibCtx, &conn->ibQp));
}
@@ -474,8 +505,9 @@ mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output
return mscclppSuccess;
}
mscclppResult_t mscclppIbConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/){
if (connInfo == NULL || conn == NULL){
mscclppResult_t mscclppIbConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/)
{
if (connInfo == NULL || conn == NULL) {
WARN("ipcHandles or connection cannot be null");
return mscclppInternalError;
}
@@ -498,7 +530,7 @@ mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm)
{
// Send info to peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
struct mscclppConn* conn = &comm->conns[i];
struct connInfo cInfo;
if (conn->transport == mscclppTransportP2P) {
@@ -512,7 +544,7 @@ mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm)
// Recv info from peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
struct mscclppConn* conn = &comm->conns[i];
struct connInfo cInfo;
MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &cInfo, sizeof(cInfo)));
if (conn->transport == mscclppTransportP2P) {
@@ -564,8 +596,8 @@ mscclppResult_t mscclppCommSize(mscclppComm_t comm, int* size)
return mscclppSuccess;
}
MSCCLPP_API(void, mscclppDefaultLogHandler, int level, unsigned long flags, const char *msg);
void mscclppDefaultLogHandler(int level, unsigned long flags, const char *msg)
MSCCLPP_API(void, mscclppDefaultLogHandler, int level, unsigned long flags, const char* msg);
void mscclppDefaultLogHandler(int level, unsigned long flags, const char* msg)
{
mscclppDebugDefaultLogHandler(level, flags, msg);
}