Cleanup old files and functions (#86)

This commit is contained in:
Changho Hwang
2023-06-01 17:34:57 +08:00
committed by GitHub
parent 457c422791
commit 9cee6c4a74
41 changed files with 657 additions and 2030 deletions

View File

@@ -32,7 +32,7 @@ option(INSTALL_GTEST OFF)
FetchContent_MakeAvailable(googletest)
include(GoogleTest)
set(CLANG_FORMAT_SOURCE_DIRS include src tests)
set(CLANG_FORMAT_SOURCE_DIRS include src test)
include(${PROJECT_SOURCE_DIR}/cmake/AddClangFormatTargets.cmake)
add_library(mscclpp SHARED)

View File

@@ -829,7 +829,7 @@ WARN_LOGFILE =
# spaces. See also FILE_PATTERNS and EXTENSION_MAPPING
# Note: If this tag is empty the current directory is searched.
INPUT = ../src/include
INPUT = ../include/mscclpp
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

View File

@@ -6,6 +6,7 @@
#include <chrono>
#include <cstdio>
#include <cstring>
#include <mscclpp/errors.hpp>
#include <string>
namespace mscclpp {
@@ -42,7 +43,7 @@ inline std::string getHostName(int maxlen, const char delim) {
std::string hostname(maxlen + 1, '\0');
if (gethostname(const_cast<char*>(hostname.data()), maxlen) != 0) {
std::strncpy(const_cast<char*>(hostname.data()), "unknown", maxlen);
throw;
throw Error("gethostname failed", ErrorCode::SystemError);
}
int i = 0;
while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++;

View File

@@ -11,10 +11,9 @@
#include <vector>
#include "api.h"
#include "checks.h"
#include "checks_internal.hpp"
#include "socket.h"
#include "utils.h"
#include "utils_internal.hpp"
using namespace mscclpp;
@@ -115,7 +114,7 @@ UniqueId Bootstrap::Impl::getUniqueId() const {
UniqueId Bootstrap::Impl::createUniqueId() {
netInit("");
MSCCLPPTHROW(getRandomData(&uniqueId_.magic, sizeof(uniqueId_.magic)));
getRandomData(&uniqueId_.magic, sizeof(uniqueId_.magic));
std::memcpy(&uniqueId_.addr, &netIfAddr_, sizeof(mscclppSocketAddress));
bootstrapCreateRoot();
return getUniqueId();

View File

@@ -13,10 +13,10 @@
#include <string.h>
#include <unistd.h>
#include "checks.h"
#include "config.h"
#include "checks_internal.hpp"
#include "config.hpp"
#include "debug.h"
#include "utils.h"
#include "utils_internal.hpp"
static mscclppResult_t socketProgressOpt(int op, struct mscclppSocket* sock, void* ptr, int size, int* offset,
int block, int* closed) {
@@ -112,12 +112,12 @@ static int findInterfaces(const char* prefixList, char* names, union mscclppSock
#ifdef ENABLE_TRACE
char line[SOCKET_NAME_MAXLEN + 1];
#endif
struct netIf userIfs[MAX_IFS];
struct mscclpp::netIf userIfs[MAX_IFS];
bool searchNot = prefixList && prefixList[0] == '^';
if (searchNot) prefixList++;
bool searchExact = prefixList && prefixList[0] == '=';
if (searchExact) prefixList++;
int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS);
int nUserIfs = mscclpp::parseStringList(prefixList, userIfs, MAX_IFS);
int found = 0;
struct ifaddrs *interfaces, *interface;
@@ -142,7 +142,7 @@ static int findInterfaces(const char* prefixList, char* names, union mscclppSock
}
// check against user specified interfaces
if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) {
if (!(mscclpp::matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) {
continue;
}
@@ -264,9 +264,9 @@ mscclppResult_t mscclppSocketGetAddrFromString(union mscclppSocketAddress* ua, c
bool ipv6 = ip_port_pair[0] == '[';
/* Construct the sockaddress structure */
if (!ipv6) {
struct netIf ni;
struct mscclpp::netIf ni;
// parse <ip_or_hostname>:<port> string, expect one pair
if (parseStringList(ip_port_pair, &ni, 1) != 1) {
if (mscclpp::parseStringList(ip_port_pair, &ni, 1) != 1) {
WARN("Net : No valid <IPv4_or_hostname>:<port> pair found");
return mscclppInvalidArgument;
}
@@ -422,13 +422,13 @@ mscclppResult_t mscclppSocketGetAddr(struct mscclppSocket* sock, union mscclppSo
static mscclppResult_t socketTryAccept(struct mscclppSocket* sock) {
static bool timeInitialized = false;
static mscclppTime_t initTime;
static mscclpp::TimePoint initTime;
if (!timeInitialized) {
timeInitialized = true;
initTime = getClock();
initTime = mscclpp::getClock();
}
mscclppConfig* config = mscclppConfig::getInstance();
mscclpp::Config* config = mscclpp::Config::getInstance();
time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig();
socklen_t socklen = sizeof(union mscclppSocketAddress);
sock->fd = accept(sock->acceptFd, &sock->addr.sa, &socklen);
@@ -439,7 +439,7 @@ static mscclppResult_t socketTryAccept(struct mscclppSocket* sock) {
WARN("socketTryAccept: get errno %d that is not EAGAIN or EWOULDBLOCK", errno);
timeInitialized = false;
return mscclppSystemError;
} else if (elapsedClock(getClock(), initTime) > acceptTimeout) {
} else if (mscclpp::elapsedClock(mscclpp::getClock(), initTime) > acceptTimeout) {
WARN("socketTryAccept: exceeded timeout (%ld) sec", acceptTimeout);
timeInitialized = false;
return mscclppRemoteError;
@@ -483,13 +483,13 @@ static mscclppResult_t socketFinalizeAccept(struct mscclppSocket* sock) {
static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) {
static bool timeInitialized = false;
static mscclppTime_t initTime;
static mscclpp::TimePoint initTime;
if (!timeInitialized) {
timeInitialized = true;
initTime = getClock();
initTime = mscclpp::getClock();
}
mscclppConfig* config = mscclppConfig::getInstance();
mscclpp::Config* config = mscclpp::Config::getInstance();
time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig();
/* blocking/non-blocking connect() is determined by asyncFlag. */
@@ -502,7 +502,7 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) {
sock->state = mscclppSocketStateConnectPolling;
return mscclppSuccess;
} else if (errno == ECONNREFUSED || errno == ETIMEDOUT) {
if (elapsedClock(getClock(), initTime) > acceptTimeout) {
if (mscclpp::elapsedClock(mscclpp::getClock(), initTime) > acceptTimeout) {
WARN("socketStartConnect: exceeded timeout (%ld) sec", acceptTimeout);
sock->state = mscclppSocketStateError;
timeInitialized = false;
@@ -522,13 +522,13 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) {
static mscclppResult_t socketPollConnect(struct mscclppSocket* sock) {
static bool timeInitialized = false;
static mscclppTime_t initTime;
static mscclpp::TimePoint initTime;
if (!timeInitialized) {
timeInitialized = true;
initTime = getClock();
initTime = mscclpp::getClock();
}
mscclppConfig* config = mscclppConfig::getInstance();
mscclpp::Config* config = mscclpp::Config::getInstance();
time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig();
struct pollfd pfd;
@@ -552,7 +552,7 @@ static mscclppResult_t socketPollConnect(struct mscclppSocket* sock) {
timeInitialized = false;
sock->state = mscclppSocketStateConnected;
} else if (ret == ECONNREFUSED || ret == ETIMEDOUT) {
if (elapsedClock(getClock(), initTime) > acceptTimeout) {
if (mscclpp::elapsedClock(mscclpp::getClock(), initTime) > acceptTimeout) {
WARN("socketPollConnect: exceeded timeout (%ld) sec", acceptTimeout);
sock->state = mscclppSocketStateError;
return mscclppRemoteError;

View File

@@ -1,5 +1,4 @@
#include "api.h"
#include "config.h"
#include "debug.h"
#include "mscclpp.h"
@@ -9,12 +8,6 @@ MSCCLPP_API mscclppResult_t mscclppSetLogHandler(mscclppLogHandler_t handler) {
return mscclppDebugSetLogHandler(handler);
}
MSCCLPP_API mscclppResult_t mscclppSetBootstrapConnTimeout(int timeout) {
mscclppConfig* config = mscclppConfig::getInstance();
config->setBootstrapConnectionTimeoutConfig(timeout);
return mscclppSuccess;
}
MSCCLPP_API const char* mscclppGetErrorString(mscclppResult_t code) {
switch (code) {
case mscclppSuccess:

View File

@@ -3,7 +3,7 @@
#include "api.h"
#include "checks_internal.hpp"
#include "debug.h"
#include "utils.h"
#include "numa.hpp"
namespace mscclpp {
namespace channel {
@@ -13,12 +13,12 @@ MSCCLPP_API_CPP DeviceChannelService::DeviceChannelService(Communicator& communi
proxy_([&](ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) {
int cudaDevice;
MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice));
MSCCLPPTHROW(getDeviceNumaNode(cudaDevice, &deviceNumaNode));
deviceNumaNode = getDeviceNumaNode(cudaDevice);
}
MSCCLPP_API_CPP void DeviceChannelService::bindThread() {
if (deviceNumaNode >= 0) {
MSCCLPPTHROW(numaBind(deviceNumaNode));
numaBind(deviceNumaNode);
INFO(MSCCLPP_INIT, "NUMA node of DeviceChannelService proxy thread is set to %d", deviceNumaNode);
}
}

View File

@@ -8,7 +8,7 @@
#include "connection.hpp"
#include "debug.h"
#include "registered_memory.hpp"
#include "utils.h"
#include "utils_internal.hpp"
namespace mscclpp {

View File

@@ -1,9 +1,11 @@
#include "config.h"
#include "config.hpp"
mscclppConfig mscclppConfig::_instance;
namespace mscclpp {
Config Config::instance_;
mscclppConfig* mscclppConfig::getInstance() { return &_instance; }
Config* Config::getInstance() { return &instance_; }
time_t mscclppConfig::getBootstrapConnectionTimeoutConfig() { return bootstrapConnectionTimeout; }
time_t Config::getBootstrapConnectionTimeoutConfig() { return bootstrapConnectionTimeout; }
void mscclppConfig::setBootstrapConnectionTimeoutConfig(time_t timeout) { bootstrapConnectionTimeout = timeout; }
void Config::setBootstrapConnectionTimeoutConfig(time_t timeout) { bootstrapConnectionTimeout = timeout; }
} // namespace mscclpp

View File

@@ -12,6 +12,8 @@
#include <sys/syscall.h>
#include <unistd.h>
#include <mscclpp/utils.hpp>
int mscclppDebugLevel = -1;
static int pid = -1;
static char hostname[1024];
@@ -100,7 +102,7 @@ void mscclppDebugInit() {
}
// Cache pid and hostname
getHostName(hostname, 1024, '.');
strncpy(hostname, mscclpp::getHostName(1024, '.').c_str(), 1024);
pid = getpid();
/* Parse and expand the MSCCLPP_DEBUG_FILE path and

View File

@@ -8,6 +8,7 @@
#include <cstdlib>
#include <cstring>
#include <mscclpp/core.hpp>
#include <mscclpp/fifo.hpp>
#include <sstream>
#include <string>

View File

@@ -1,41 +0,0 @@
/*************************************************************************
* Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef NCCL_ALIGN_H_
#define NCCL_ALIGN_H_
#define DIVUP(x, y) (((x) + (y)-1) / (y))
#define ROUNDUP(x, y) (DIVUP((x), (y)) * (y))
#define ALIGN_SIZE(size, align) size = ((size + (align)-1) / (align)) * (align);
#if !__CUDA_ARCH__
#ifndef __host__
#define __host__
#endif
#ifndef __device__
#define __device__
#endif
#endif
template <typename X, typename Y, typename Z = decltype(X() + Y())>
__host__ __device__ constexpr Z divUp(X x, Y y) {
return (x + y - 1) / y;
}
template <typename X, typename Y, typename Z = decltype(X() + Y())>
__host__ __device__ constexpr Z roundUp(X x, Y y) {
return (x + y - 1) - (x + y - 1) % y;
}
// assumes second argument is a power of 2
template <typename X, typename Z = decltype(X() + int())>
__host__ __device__ constexpr Z alignUp(X x, int a) {
return (x + a - 1) & Z(-a);
}
#endif

View File

@@ -1,188 +0,0 @@
/*************************************************************************
* Copyright (c) 2019-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_CHECKS_H_
#define MSCCLPP_CHECKS_H_
#include <cuda_runtime.h>
#include "debug.h"
// Check CUDA RT calls
#define CUDACHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
WARN("Cuda failure '%s'", cudaGetErrorString(err)); \
return mscclppUnhandledCudaError; \
} \
} while (false)
#define CUDACHECKNORET(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
WARN("Cuda failure '%s'", cudaGetErrorString(err)); \
return; \
} \
} while (false)
#define CUDACHECKGOTO(cmd, res, label) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
WARN("Cuda failure '%s'", cudaGetErrorString(err)); \
res = mscclppUnhandledCudaError; \
goto label; \
} \
} while (false)
// Report failure but clear error and continue
#define CUDACHECKIGNORE(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
INFO(MSCCLPP_ALL, "%s:%d Cuda failure '%s'", __FILE__, __LINE__, cudaGetErrorString(err)); \
(void)cudaGetLastError(); \
} \
} while (false)
#include <errno.h>
// Check system calls
#define SYSCHECK(call, name) \
do { \
int retval; \
SYSCHECKVAL(call, name, retval); \
} while (false)
#define SYSCHECKVAL(call, name, retval) \
do { \
SYSCHECKSYNC(call, name, retval); \
if (retval == -1) { \
WARN("Call to " name " failed : %s", strerror(errno)); \
return mscclppSystemError; \
} \
} while (false)
#define SYSCHECKSYNC(call, name, retval) \
do { \
retval = call; \
if (retval == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)) { \
INFO(MSCCLPP_ALL, "Call to " name " returned %s, retrying", strerror(errno)); \
} else { \
break; \
} \
} while (true)
#define SYSCHECKGOTO(statement, res, label) \
do { \
if ((statement) == -1) { \
/* Print the back trace*/ \
res = mscclppSystemError; \
INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
goto label; \
} \
} while (0);
#define NEQCHECK(statement, value) \
do { \
if ((statement) != value) { \
/* Print the back trace*/ \
INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, mscclppSystemError); \
return mscclppSystemError; \
} \
} while (0);
#define NEQCHECKGOTO(statement, value, res, label) \
do { \
if ((statement) != value) { \
/* Print the back trace*/ \
res = mscclppSystemError; \
INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
goto label; \
} \
} while (0);
#define EQCHECK(statement, value) \
do { \
if ((statement) == value) { \
/* Print the back trace*/ \
INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, mscclppSystemError); \
return mscclppSystemError; \
} \
} while (0);
#define EQCHECKGOTO(statement, value, res, label) \
do { \
if ((statement) == value) { \
/* Print the back trace*/ \
res = mscclppSystemError; \
INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
goto label; \
} \
} while (0);
// Propagate errors up
#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
return res; \
} \
} while (0);
#define MSCCLPPCHECKGOTO(call, res, label) \
do { \
res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
goto label; \
} \
} while (0);
#define MSCCLPPWAIT(call, cond, abortFlagPtr) \
do { \
volatile uint32_t* tmpAbortFlag = (abortFlagPtr); \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
return mscclppInternalError; \
} \
if (tmpAbortFlag) NEQCHECK(*tmpAbortFlag, 0); \
} while (!(cond));
#define MSCCLPPWAITGOTO(call, cond, abortFlagPtr, res, label) \
do { \
volatile uint32_t* tmpAbortFlag = (abortFlagPtr); \
res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
goto label; \
} \
if (tmpAbortFlag) NEQCHECKGOTO(*tmpAbortFlag, 0, res, label); \
} while (!(cond));
#define MSCCLPPCHECKTHREAD(a, args) \
do { \
if (((args)->ret = (a)) != mscclppSuccess && (args)->ret != mscclppInProgress) { \
INFO(MSCCLPP_INIT, "%s:%d -> %d [Async thread]", __FILE__, __LINE__, (args)->ret); \
return args; \
} \
} while (0)
#define CUDACHECKTHREAD(a) \
do { \
if ((a) != cudaSuccess) { \
INFO(MSCCLPP_INIT, "%s:%d -> %d [Async thread]", __FILE__, __LINE__, args->ret); \
args->ret = mscclppUnhandledCudaError; \
return args; \
} \
} while (0)
#endif

View File

@@ -3,6 +3,8 @@
#include <mscclpp/checks.hpp>
#include "debug.h"
#define MSCCLPPTHROW(call) \
do { \
mscclppResult_t res = call; \
@@ -18,4 +20,80 @@
} \
} while (false)
// Check system calls
#define SYSCHECK(call, name) \
do { \
int retval; \
SYSCHECKVAL(call, name, retval); \
} while (false)
#define SYSCHECKVAL(call, name, retval) \
do { \
SYSCHECKSYNC(call, name, retval); \
if (retval == -1) { \
WARN("Call to " name " failed : %s", strerror(errno)); \
return mscclppSystemError; \
} \
} while (false)
#define SYSCHECKSYNC(call, name, retval) \
do { \
retval = call; \
if (retval == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)) { \
INFO(MSCCLPP_ALL, "Call to " name " returned %s, retrying", strerror(errno)); \
} else { \
break; \
} \
} while (true)
#define SYSCHECKGOTO(statement, res, label) \
do { \
if ((statement) == -1) { \
/* Print the back trace*/ \
res = mscclppSystemError; \
INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
goto label; \
} \
} while (0);
#define EQCHECK(statement, value) \
do { \
if ((statement) == value) { \
/* Print the back trace*/ \
INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, mscclppSystemError); \
return mscclppSystemError; \
} \
} while (0);
#define EQCHECKGOTO(statement, value, res, label) \
do { \
if ((statement) == value) { \
/* Print the back trace*/ \
res = mscclppSystemError; \
INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
goto label; \
} \
} while (0);
// Propagate errors up
#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
return res; \
} \
} while (0);
#define MSCCLPPCHECKGOTO(call, res, label) \
do { \
res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
goto label; \
} \
} while (0);
#endif

View File

@@ -9,7 +9,6 @@
#include <unordered_map>
#include "ib.hpp"
#include "mscclpp.h"
namespace mscclpp {

View File

@@ -3,20 +3,22 @@
#include <time.h>
class mscclppConfig {
namespace mscclpp {
class Config {
public:
time_t bootstrapConnectionTimeout = 30;
static mscclppConfig* getInstance();
static Config* getInstance();
time_t getBootstrapConnectionTimeoutConfig();
void setBootstrapConnectionTimeoutConfig(time_t timeout);
private:
mscclppConfig() = default;
mscclppConfig(const mscclppConfig&) = delete;
mscclppConfig& operator=(const mscclppConfig&) = delete;
Config() = default;
Config(const Config&) = delete;
Config& operator=(const Config&) = delete;
static mscclppConfig _instance;
static Config instance_;
};
} // namespace mscclpp
#endif // end include guard

View File

@@ -46,7 +46,6 @@ extern int mscclppDebugLevel;
extern uint64_t mscclppDebugMask;
extern pthread_mutex_t mscclppDebugLock;
extern FILE* mscclppDebugFile;
extern mscclppResult_t getHostName(char* hostname, int maxlen, const char delim);
void mscclppDebugDefaultLogHandler(const char* msg);
void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char* filefunc, int line, const char* fmt,

View File

@@ -1,237 +1,12 @@
#ifndef MSCCLPP_H_
#define MSCCLPP_H_
#define MSCCLPP_MAJOR 0
#define MSCCLPP_MINOR 1
#define MSCCLPP_PATCH 0
#define MSCCLPP_VERSION (MSCCLPP_MAJOR * 10000 + MSCCLPP_MINOR * 100 + MSCCLPP_PATCH)
// For every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER, a flush of the tail to device memory is triggered.
// As long as MSCCLPP_PROXY_FIFO_SIZE is large enough, having a stale tail is not a problem.
#define MSCCLPP_PROXY_FIFO_SIZE 128
#define MSCCLPP_PROXY_FIFO_FLUSH_COUNTER 4
#include <mscclppfifo.h>
#include <vector>
// #includa <cuda_runtime.h>
// TODO: deprecate this file
#ifdef __cplusplus
extern "C" {
#endif
struct alignas(16) mscclppDevConnSignalEpochId {
// every signal(), increaments this and either:
// 1) proxy thread pushes it to the remote peer's localSignalEpochId->proxy
// 2) gpu thread directly writes it to remoteSignalEpochId->device
uint64_t device;
// signal() function triggers the cpu proxy thread to write to it
uint64_t proxy;
};
using mscclppBufferHandle_t = uint32_t;
/***************************************************************************************************************
* A mscclppDevConn provides a zero-copy connection between two GPUs connected via P2P NVLink or InfiniBand.
* The communication API is one-sided meaning that for every single data transfer, only one side
* needs to execute unlike a two-sided communication stack such as NCCL where both sides
* need to execute a send and a receive instruction, respectively, for every transfer.
*
* A connection is uniquely identified by the (remoteRank, tag) pair at an endpoint.
* The two endpoints register buffers of the same size with the connection.
*
* The endpoints provide the remoteRank, tag, and the buffer when registering a connection with msccppConnect().
*
* mscllppConnectionSetup() sets up all the registered connections.
*
***************************************************************************************************************
* A proxy thread running on the CPU is necessary to perform transfers using InfiniBand or the DMA engine.
* The current implementation uses a single proxy thread per context - one IB connection or DMA engine per node.
* Thus multiple threadblocks using different connections might use the same CPU proxy thread.
*
* Before using any of functionality of connections, mscclppProxyLaunch needs to be called to spawn the
* proxy threads. There are currently two types of connections:
*
* P2P via NVLink: the DMA engine can perform the copy between the buffers. DMA engine has higher latency
* but has a higher bandwidth and costs no compute cycles on the GPU.
*
* InfiniBand: the RDMA engine copies the data over MLX devices.
*
***************************************************************************************************************
* At the runtime, a GPU kernel has access to a mscclppDevConn object that provides the following functions:
*
* put(): [non-blocking] the sender initiates a data transfer to the receiver.
*
* signal(): [non-blocking] the sender signals the receiver that data is ready to be consumed.
*
* flush(): [blocking] the sender waits for all the data transfers to complete
*
* wait(): [blocking] the receiver waits on the signal() to start reading the data.
*
* The sender should not reuse the buffer till the flush() returns.
* The receiver should only access the data after the wait() returns.
*
* putWithSignal(): the sender initiates a data transfer and signals the receiver that data is ready to be consumed.
* This is an optimized version of a put() followed by a signal().
*
* These functions hide the complexity of syncrhonization between the two GPUs and the CPU proxy thread.
* Example:
*
* // sender GPU
* devConn.put(data1)
* // not OK to write to data1
* devConn.put(data2)
* // not OK to write to data1, data2
* devConn.put(data3) // receiver GPU
* // not OK to write to data1, data2, data3 // not OK to read data1, data2, data3
* devConn.signal() -------------------------------> devConn.wait()
* // not OK to write to data1, data2, data3 // OK to read data1, data2, data3
* devConn.flush()
* // OK to write to data1, data2, data3
*
*
* The two endpoint can concurrently use the same connection provided they are writing (puts) on different
* indices in the registered buffer.
**************************************************************************************************************/
struct mscclppDevConn {
#ifdef __CUDACC__
__forceinline__ __device__ void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) {
fifo.push(mscclppData, dstDataOffset, srcDataOffset, dataSize);
}
__forceinline__ __device__ void put(uint64_t dataOffset, uint64_t dataSize) { put(dataOffset, dataOffset, dataSize); }
__forceinline__ __device__ void signal() {
epochIncrement();
fifo.push(mscclppFlag, 0, 0, 1);
}
__forceinline__ __device__ void putWithSignal(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) {
epochIncrement();
fifo.push(mscclppData | mscclppFlag, dstDataOffset, srcDataOffset, dataSize);
}
__forceinline__ __device__ void putWithSignal(uint64_t dataOffset, uint64_t dataSize) {
putWithSignal(dataOffset, dataOffset, dataSize);
}
__forceinline__ __device__ void putWithSignalAndFlush(uint64_t dstDataOffset, uint64_t srcDataOffset,
uint64_t dataSize) {
epochIncrement();
uint64_t curFifoHead = fifo.push(mscclppData | mscclppFlag | mscclppSync, dstDataOffset, srcDataOffset, dataSize);
while (*(volatile uint64_t*)&fifo.triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 &&
*(volatile uint64_t*)fifo.triggerFifoTail <= curFifoHead)
;
}
__forceinline__ __device__ void putWithSignalAndFlush(uint64_t dataOffset, uint64_t dataSize) {
putWithSignalAndFlush(dataOffset, dataOffset, dataSize);
}
__forceinline__ __device__ void flush() {
uint64_t curFifoHead = fifo.push(mscclppSync, 0, 0, 1);
// we need to wait for two conditions to be met to ensure the CPU is done flushing. (1) wait for the tail
// to go pass by curFifoHead (this is safety net) and (2) wait for the work element value to change to 0.
while (*(volatile uint64_t*)&fifo.triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 &&
*(volatile uint64_t*)fifo.triggerFifoTail <= curFifoHead)
;
}
// Version that uses the SM directly to do the copy, instead of using the proxy thread like the functions above.
__forceinline__ __device__ void putDirect(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize,
uint32_t threadId, uint32_t numThreads) {
uint64_t* src = (uint64_t*)((char*)localBuff + srcDataOffset);
uint64_t* dst = (uint64_t*)((char*)remoteBuff + dstDataOffset);
// assume the memory is aligned to 8 bytes
size_t nElem =
dataSize % sizeof(uint64_t) ? (dataSize + sizeof(uint64_t)) / sizeof(uint64_t) : dataSize / sizeof(uint64_t);
for (size_t i = threadId; i < nElem; i += numThreads) {
dst[i] = src[i];
}
}
__forceinline__ __device__ void putDirect(uint64_t dataOffset, uint64_t dataSize, uint32_t threadId,
uint32_t numThreads) {
putDirect(dataOffset, dataOffset, dataSize, threadId, numThreads);
}
__forceinline__ __device__ void signalDirect() {
// This fence ensures that the writes from a preceding putDirect() are visible on the peer GPU before the
// incremented epoch id is visible.
__threadfence_system();
epochIncrement();
*(volatile uint64_t*)&(remoteSignalEpochId->device) = localSignalEpochId->device;
}
__forceinline__ __device__ void wait() {
(*waitEpochId) += 1;
while (*(volatile uint64_t*)&(localSignalEpochId->proxy) < (*waitEpochId))
;
}
__forceinline__ __device__ void waitDirect() {
(*waitEpochId) += 1;
while (*(volatile uint64_t*)&(localSignalEpochId->device) < (*waitEpochId))
;
}
__forceinline__ __device__ void epochIncrement() { *(volatile uint64_t*)&(localSignalEpochId->device) += 1; }
#endif // __CUDACC__
// this is a concurrent fifo which is multiple threads from the device
// can produce for and the sole proxy thread consumes it.
struct mscclppConcurrentFifo fifo;
int remoteRank;
int tag;
// my local buffer
void* localBuff;
struct mscclppDevConnSignalEpochId* localSignalEpochId;
// used by the signal() function directly from gpu
struct mscclppDevConnSignalEpochId* remoteSignalEpochId;
// every wait(), increaments this and then the gpu waits for either:
// 1) localSignalEpochId->proxy to be >= this in case of a proxy thread
// 2) remoteSignalEpochId->device to be >= this in case of a gpu thread
uint64_t* waitEpochId;
// my remote peer's buffer. only non-NULL with gpu's direct access
// gpu can directly write into it
void* remoteBuff;
};
// Host interface for mscclppDevCon functionality
struct mscclppHostConn {
virtual ~mscclppHostConn() = default;
virtual void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) = 0;
virtual void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset,
uint64_t dataSize) = 0;
virtual void signal() = 0;
virtual void wait() = 0;
virtual void flush() = 0;
};
typedef struct mscclppComm* mscclppComm_t;
typedef struct mscclppDevConn mscclppDevConn_t;
typedef struct mscclppHostConn mscclppHostConn_t;
#define MSCCLPP_UNIQUE_ID_BYTES 128
typedef struct {
char internal[MSCCLPP_UNIQUE_ID_BYTES];
} mscclppUniqueId;
struct mscclppRegisteredMemoryP2P {
void* remoteBuff;
const void* IbMr;
};
struct mscclppRegisteredMemory {
std::vector<mscclppRegisteredMemoryP2P> p2p;
};
/* Error type */
typedef enum {
mscclppSuccess = 0,
@@ -245,68 +20,6 @@ typedef enum {
mscclppNumResults = 8
} mscclppResult_t;
/* Create a unique ID for communication. Only needs to be called by one process.
* Use with mscclppCommInitRankFromId().
* All processes need to provide the same ID to mscclppCommInitRankFromId().
*
* Outputs:
* uniqueId: the unique ID to be created
*/
mscclppResult_t mscclppGetUniqueId(mscclppUniqueId* uniqueId);
/* Transport Types */
typedef enum {
mscclppTransportP2P = 0,
mscclppTransportSHM = 1, // TODO(chhwang): not implemented yet
mscclppTransportIB = 2,
} mscclppTransport_t;
/* Initialize a communicator. nranks processes with rank 0 to nranks-1 need to call this function.
*
* Outputs:
* comm: the communicator to be initialized
*
* Inputs:
* nranks: number of ranks in the communicator
* ipPortPair: a string of the form "ip:port" that represents the address of the root process
* rank: rank of the calling process
*/
mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char* ipPortPair, int rank);
/* Initialize a communicator from a given mscclppUniqueId. Same as mscclppCommInitRank() except that
* id is provided by the user by calling mscclppGetUniqueId()
*
* Outputs:
* comm: the communicator to be initialized
*
* Inputs:
* nranks: number of ranks in the communicator
* id: the unique ID to be used for communication
* rank: rank of the calling process
*/
mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank);
/* Ring-based AllGather through the bootstrap socket.
*
* Outputs:
* comm: the communicator
*
* Inputs:
* data: data array to be gathered where `[r*size, (r+1)*size)` is the data for rank `r`
* size: data size per rank
*/
mscclppResult_t mscclppBootstrapAllGather(mscclppComm_t comm, void* data, int size);
/* A no-op function that is used to synchronize all processes via a bootstrap allgather*/
mscclppResult_t mscclppBootstrapBarrier(mscclppComm_t comm);
/* Destroy a communicator.
*
* Inputs:
* comm: the communicator to be destroyed
*/
mscclppResult_t mscclppCommDestroy(mscclppComm_t comm);
/* Return the string for the given error code.
*
* Output:
@@ -317,136 +30,6 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm);
*/
const char* mscclppGetErrorString(mscclppResult_t result);
/* Connect to a remote rank. This function only prepares metadata for connection. The actual connection
* is made by a following call of mscclppConnectionSetup(). Note that this function is two-way and a connection
* from rank i to remote rank j needs to have a counterpart from rank j to rank i.
* Note that with IB, buffers are registered at a page level and if a buffer is spread through multiple pages
* and do not fully utilize all of them, IB's QP has to register for all involved pages. This potentially has
* security risks if the devConn's accesses are given to a malicious process.
*
* Inputs:
* comm: the communicator
* remoteRank: the rank of the remote process
* tag: the tag of the connection. tag is copied into the corresponding mscclppDevConn_t, which can be
* used to identify the connection inside a GPU kernel.
* localBuff: the local send/receive buffer
* buffSize: the size of the local buffer
* transportType: the type of transport to be used (mscclppTransportP2P or mscclppTransportIB)
* ibDev: the name of the IB device to be used. Expects a null for mscclppTransportP2P.
*/
mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff, uint64_t buffSize,
mscclppTransport_t transportType, const char* ibDev = 0);
/* Connect to a remote rank. This function only prepares metadata for connection. The actual connection
* is made by a following call of mscclppConnectionSetup(). Note that this function is two-way and a connection
* from rank i to remote rank j needs to have a counterpart from rank j to rank i.
* Note that with IB, buffers are registered at a page level and if a buffer is spread through multiple pages
* and do not fully utilize all of them, IB's QP has to register for all involved pages. This potentially has
* security risks if the devConn's accesses are given to a malicious process.
*
* This version does not register a buffer. Buffers should instead be registered with mscclppRegisterBuffer().
*
* Inputs:
* comm: the communicator
* remoteRank: the rank of the remote process
* tag: the tag of the connection. tag is copied into the corresponding mscclppDevConn_t, which can be
* used to identify the connection inside a GPU kernel.
* transportType: the type of transport to be used (mscclppTransportP2P or mscclppTransportIB)
* ibDev: the name of the IB device to be used. Expects a null for mscclppTransportP2P.
*/
mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int remoteRank, int tag,
mscclppTransport_t transportType, const char* ibDev = 0);
/* Register a buffer for use with a connection.
*
* Inputs:
* comm: the communicator
* connIdx: the index of the connection by order of calls to mscclppConnect/mscclppConnectWithoutBuffer
* localBuff: the local send/receive buffer
* buffSize: the size of the local buffer
*
* Outputs:
* handle: a handle to the buffer registration
*/
mscclppResult_t mscclppRegisterBufferForConnection(mscclppComm_t comm, int connIdx, void* localBuff, uint64_t buffSize,
mscclppBufferHandle_t* handle);
/* Establish all connections declared by mscclppConnect(). This function must be called after all mscclppConnect()
* calls are made. This function ensures that all remote ranks are ready to communicate when it returns.
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm);
/* Return an array of mscclppDevConn_t and the number of connections created by mscclppConnectionSetup().
* The order of connections matches the order of mscclppConnect() calls.
*
* Outputs:
* devConns: the array of mscclppDevConn_t. Each mscclppDevConn_t corresponds to a mscclppConnect() call in the
* order of the calls.
* nConns: the number of connections
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppGetAllDeviceConnections(mscclppComm_t comm, mscclppDevConn_t** devConns, int* nConns);
/* Return the mscclppDevConn_t corresponding to a given tag and a remoteRank.
*
* Outputs:
* devConn: the mscclppDevConn_t corresponding to the given tag
*
* Inputs:
* comm: the communicator
* tag: the tag of the connection
* remoteRank: the remoteRank of the connection
*/
mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag, mscclppDevConn_t** devConn);
/* Launch proxy threads for all connections created by mscclppConnectionSetup(). This function is supposed to be called
* before starting a kernel that uses mscclppDevConn_t. Up to two proxy threads are launched for each (GPU + IB) pair
* (one for P2P NVLink and one for InfiniBand).
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm);
/* Stop all proxy threads.
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppProxyStop(mscclppComm_t comm);
/* Return the rank of the calling process.
*
* Outputs:
* rank: the rank of the calling process
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppCommRank(mscclppComm_t comm, int* rank);
/* Return the number of ranks of the communicator.
*
* Outputs:
* size: the number of ranks of the communicator
*
* Inputs:
* comm: the communicator
*/
mscclppResult_t mscclppCommSize(mscclppComm_t comm, int* size);
/* Set the timeout for the bootstrap connection.
*
* Inputs:
* timeout: the timeout in seconds
*/
mscclppResult_t mscclppSetBootstrapConnTimeout(int timeout);
/* Log handler type which is a callback function for
* however user likes to handle the log messages. Once set,
* the logger will just call this function with msg.
@@ -467,33 +50,6 @@ void mscclppDefaultLogHandler(const char* msg);
*/
mscclppResult_t mscclppSetLogHandler(mscclppLogHandler_t handler);
/* Register a buffer for RDMA.
*
* Outputs:
* regMem: the registered memory
*
* Inputs:
* comm: the communicator
* local_memory: the local buffer to be registered
* size: the size of the buffer
*/
mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* local_memory, size_t size,
mscclppRegisteredMemory* regMem);
/* Write to a registered buffer.
*
* Inputs:
* comm: the communicator
* regMem: the registered memory
* srcBuff: the source buffer
* size: the size of the buffer
* srcOffset: the offset of the source buffer
* dstOffset: the offset of the destination buffer
* stream: the CUDA stream
*/
mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem, void* srcBuff,
size_t size, uint32_t srcOffset, uint32_t dstOffset, int64_t stream);
#ifdef __cplusplus
} // end extern "C"
#endif

View File

@@ -1,78 +0,0 @@
#ifndef MSCCLPPFIFO_H_
#define MSCCLPPFIFO_H_
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef enum : uint64_t { mscclppData = 0x1, mscclppFlag = 0x2, mscclppSync = 0x4 } mscclppTriggerType_t;
#define MSCCLPP_BITS_SIZE 32
#define MSCCLPP_BITS_OFFSET 32
#define MSCCLPP_BITS_TYPE 3
#define MSCCLPP_BITS_CONNID 10
// this is the basic structure of each work element in the fifo
// the summation of number of bits must be 128 or less
union alignas(16) mscclppTrigger {
uint64_t value[2];
struct {
// first 64 bits: value[0]
uint64_t dataSize : MSCCLPP_BITS_SIZE;
uint64_t srcDataOffset : MSCCLPP_BITS_OFFSET;
uint64_t : (64 - MSCCLPP_BITS_SIZE - MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment
// second 64 bits: value[1]
uint64_t dstDataOffset : MSCCLPP_BITS_OFFSET;
uint64_t connId : MSCCLPP_BITS_CONNID;
uint64_t type : MSCCLPP_BITS_TYPE;
uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_CONNID - MSCCLPP_BITS_TYPE); // ensure 64-bit alignment
} fields;
};
typedef mscclppTrigger* mscclppTrigger_t;
/* This is a concurrent fifo where multiple device threads can push mscclppTrigger work elements to
* and a single host proxy thread consumes these work elements. There is a head pointer allocated on device
* which starts with 0 and goes to 2^64-1 which is almost infinity. There are two copies of tail, one
* that is on the deivce (triggerFifoTail) and another that is on host (proxyState->fifoTailHost).
* The host always has the "true" tail and occasionally, pushes it to the copy on the device.
* Therefore, most of the time, the device has a stale version. The invariants are:
* triggerFifoTail <= proxyState->fifoTailHost <= triggerFifoHead.
* push() function increments triggerFifoHead, proxyState->fifoTailHost is updated in proxy.cc:mscclppProxyService
* and it occasionally flushes it to triggerFifoTail via a cudaMemcpyAsync.
*
* Why duplicating the tail is a good idea? The fifo is large engouh and we do not need frequent updates
* for the tail as there is usually enough space for device threads to push their work into.
*/
struct mscclppConcurrentFifo {
#ifdef __CUDACC__
__forceinline__ __device__ uint64_t push(uint64_t type, uint64_t dstDataOffset, uint64_t srcDataOffset,
uint64_t dataSize) {
uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->triggerFifoHead, 1);
while (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->triggerFifoTail))
;
while (*(volatile uint64_t*)&this->triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0)
;
uint64_t* valptr = (uint64_t*)&(this->triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE].value);
asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(valptr),
"l"((srcDataOffset << MSCCLPP_BITS_SIZE) + dataSize),
"l"((((type << MSCCLPP_BITS_CONNID) + this->connId) << MSCCLPP_BITS_OFFSET) + dstDataOffset));
return curFifoHead;
}
#endif // __CUDACC__
mscclppTrigger* triggerFifo; // Allocate on host via cudaHostAlloc. This space is used for pushing the workelements
uint64_t* triggerFifoTail; // Allocated on device. proxyState->fifoTailHost is the true tail on host and pused
// occasionally to device
uint64_t* triggerFifoHead; // Allocated on device. Only accessed by device
int connId;
};
#ifdef __cplusplus
} // end extern "C"
#endif
#endif // MSCCLPPFIFO_H_

11
src/include/numa.hpp Normal file
View File

@@ -0,0 +1,11 @@
#ifndef MSCCLPP_NUMA_HPP_
#define MSCCLPP_NUMA_HPP_
namespace mscclpp {
int getDeviceNumaNode(int cudaDev);
void numaBind(int node);
} // namespace mscclpp
#endif // MSCCLPP_NUMA_HPP_

View File

@@ -1,70 +0,0 @@
#ifndef MSCCLPP_PROXY_H_
#define MSCCLPP_PROXY_H_
#include <cuda_runtime.h>
#include <pthread.h>
#include <atomic>
#include "comm.h"
#include "mscclpp.h"
#define MSCCLPP_PROXY_MAX_NUM (MSCCLPP_IB_MAX_DEVS + 1) // One is for a P2P proxy.
typedef enum {
MSCCLPP_PROXY_RUN_STATE_IDLE = 0,
MSCCLPP_PROXY_RUN_STATE_RUNNING,
MSCCLPP_PROXY_RUN_STATE_EXITING,
} mscclppProxyRunState_t;
struct mscclppProxyFifo {
mscclppResult_t create();
mscclppResult_t destroy();
mscclppResult_t poll(mscclppTrigger* trigger);
mscclppResult_t pop();
mscclppResult_t flushTail(bool sync = false);
// fifo cudaHostCalloc'ed that is produced by device and consumed by host
mscclppTrigger* triggerFifo;
#if defined(MSCCLPP_USE_GDRCOPY)
mscclppTrigger* triggerFifoDev;
void* triggerFifoDesc;
#endif
// allocated on the device and only accessed by the device
uint64_t* fifoHead;
// allocated on the device. Read-only by device, write-only by host
uint64_t* fifoTailDev;
#if defined(MSCCLPP_USE_GDRCOPY)
uint64_t* fifoTailDevHostPtr;
void* fifoTailDesc;
#endif
// allocated on the host. Only accessed by the host. This is a copy of the
// value pointed to by fifoTailDev and the invariant is that
// *fifoTailDev <= fifoTailHost. Meaning that host's copy of tail is
// always ahead of the device's copy and host updates the device's copy
// only when it is needed. Therefore, fifoTailHost is the "true" tail
// and fifoTailDev is a "stale" tail. See proxy.cc to undertand how
// these updates are pushed to the device.
uint64_t fifoTailHost;
// for transferring fifo tail
cudaStream_t stream;
};
struct mscclppProxyState {
mscclppTransport_t transportType;
pthread_t thread;
mscclppProxyRunState_t run;
int numaNodeToBind;
mscclpp::IbCtx* ibContext; // For IB connection only
cudaStream_t p2pStream; // for P2P DMA engine only
struct mscclppProxyFifo fifo;
};
mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm);
mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm);
#endif

View File

@@ -8,7 +8,6 @@
#include "communicator.hpp"
#include "ib.hpp"
#include "mscclpp.h"
namespace mscclpp {

View File

@@ -1,71 +0,0 @@
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_UTILS_H_
#define MSCCLPP_UTILS_H_
#include <stdint.h>
#include <stdio.h>
#include <chrono>
#include "mscclpp.h"
// int mscclppCudaCompCap();
// PCI Bus ID <-> int64 conversion functions
mscclppResult_t int64ToBusId(int64_t id, char* busId);
mscclppResult_t busIdToInt64(const char* busId, int64_t* id);
mscclppResult_t getBusId(int cudaDev, int64_t* busId);
mscclppResult_t getDeviceNumaNode(int cudaDev, int* numaNode);
mscclppResult_t getHostName(char* hostname, int maxlen, const char delim);
uint64_t getHash(const char* string, int n);
uint64_t getHostHash();
uint64_t getPidHash();
mscclppResult_t getRandomData(void* buffer, size_t bytes);
struct netIf {
char prefix[64];
int port;
};
int parseStringList(const char* string, struct netIf* ifList, int maxList);
bool matchIfList(const char* string, int port, struct netIf* ifList, int listSize, bool matchExact);
static long log2i(long n) {
long l = 0;
while (n >>= 1) l++;
return l;
}
typedef std::chrono::steady_clock::time_point mscclppTime_t;
mscclppTime_t getClock();
int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end);
/* get any bytes of random data from /dev/urandom, return 0 if it succeeds; else
* return -1 */
inline mscclppResult_t getRandomData(void* buffer, size_t bytes) {
mscclppResult_t ret = mscclppSuccess;
if (bytes > 0) {
const size_t one = 1UL;
FILE* fp = fopen("/dev/urandom", "r");
if (buffer == NULL || fp == NULL || fread(buffer, bytes, one, fp) != one) ret = mscclppSystemError;
if (fp) fclose(fp);
}
return ret;
}
mscclppResult_t numaBind(int node);
typedef struct bitmask* mscclppNumaState;
mscclppResult_t getNumaState(mscclppNumaState* state);
mscclppResult_t setNumaState(mscclppNumaState state);
#endif

View File

@@ -0,0 +1,52 @@
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_UTILS_INTERNAL_HPP_
#define MSCCLPP_UTILS_INTERNAL_HPP_
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <mscclpp/utils.hpp>
namespace mscclpp {
// PCI Bus ID <-> int64 conversion functions
std::string int64ToBusId(int64_t id);
int64_t busIdToInt64(const std::string busId);
uint64_t getHash(const char* string, int n);
uint64_t getHostHash();
uint64_t getPidHash();
void getRandomData(void* buffer, size_t bytes);
struct netIf {
char prefix[64];
int port;
};
int parseStringList(const char* string, struct netIf* ifList, int maxList);
bool matchIfList(const char* string, int port, struct netIf* ifList, int listSize, bool matchExact);
using TimePoint = std::chrono::steady_clock::time_point;
TimePoint getClock();
int64_t elapsedClock(TimePoint start, TimePoint end);
/* get any bytes of random data from /dev/urandom */
inline void getRandomData(void* buffer, size_t bytes) {
if (bytes > 0) {
const size_t one = 1UL;
FILE* fp = fopen("/dev/urandom", "r");
if (buffer == NULL || fp == NULL || fread(buffer, bytes, one, fp) != one) {
throw Error("Failed to read random data", ErrorCode::SystemError);
}
if (fp) fclose(fp);
}
}
} // namespace mscclpp
#endif

51
src/numa.cc Normal file
View File

@@ -0,0 +1,51 @@
#include <numa.h>
#include <fstream>
#include <mscclpp/checks.hpp>
#include <mscclpp/errors.hpp>
// Convert a logical cudaDev index to the NVML device minor number
static const std::string getBusId(int cudaDev) {
// On most systems, the PCI bus ID comes back as in the 0000:00:00.0
// format. Still need to allocate proper space in case PCI domain goes
// higher.
char busIdChar[] = "00000000:00:00.0";
MSCCLPP_CUDATHROW(cudaDeviceGetPCIBusId(busIdChar, sizeof(busIdChar), cudaDev));
// we need the hex in lower case format
for (int i = 0; i < sizeof(busIdChar); i++) {
busIdChar[i] = std::tolower(busIdChar[i]);
}
return std::string(busIdChar);
}
namespace mscclpp {
int getDeviceNumaNode(int cudaDev) {
std::string busId = getBusId(cudaDev);
std::string file_str = "/sys/bus/pci/devices/" + busId + "/numa_node";
std::ifstream file(file_str);
int numaNode;
if (file.is_open()) {
if (!(file >> numaNode)) {
throw Error("Failed to read NUMA node from file: " + file_str, ErrorCode::SystemError);
}
} else {
throw Error("Failed to open file: " + file_str, ErrorCode::SystemError);
}
return numaNode;
}
void numaBind(int node) {
int totalNumNumaNodes = numa_num_configured_nodes();
if (node < 0 || node >= totalNumNumaNodes) {
throw Error(
"Invalid NUMA node " + std::to_string(node) + ", must be between 0 and " + std::to_string(totalNumNumaNodes),
ErrorCode::InvalidUsage);
}
nodemask_t mask;
nodemask_zero(&mask);
nodemask_set_compat(&mask, node);
numa_bind_compat(&mask);
}
} // namespace mscclpp

View File

@@ -5,7 +5,6 @@
#include <thread>
#include "api.h"
#include "utils.h"
namespace mscclpp {

View File

@@ -7,7 +7,7 @@
#include "api.h"
#include "checks_internal.hpp"
#include "debug.h"
#include "utils.h"
#include "utils_internal.hpp"
namespace mscclpp {

View File

@@ -1,264 +0,0 @@
/*************************************************************************
* Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "utils.h"
#include <cuda_runtime.h>
#include <numa.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory>
#include <string>
#include "checks.h"
// Get current Compute Capability
// int mscclppCudaCompCap() {
// int cudaDev;
// if (cudaGetDevice(&cudaDev) != cudaSuccess) return 0;
// int ccMajor, ccMinor;
// if (cudaDeviceGetAttribute(&ccMajor, cudaDevAttrComputeCapabilityMajor, cudaDev) != cudaSuccess) return 0;
// if (cudaDeviceGetAttribute(&ccMinor, cudaDevAttrComputeCapabilityMinor, cudaDev) != cudaSuccess) return 0;
// return ccMajor*10+ccMinor;
// }
mscclppResult_t int64ToBusId(int64_t id, char* busId) {
sprintf(busId, "%04lx:%02lx:%02lx.%01lx", (id) >> 20, (id & 0xff000) >> 12, (id & 0xff0) >> 4, (id & 0xf));
return mscclppSuccess;
}
mscclppResult_t busIdToInt64(const char* busId, int64_t* id) {
char hexStr[17]; // Longest possible int64 hex string + null terminator.
int hexOffset = 0;
for (int i = 0; hexOffset < sizeof(hexStr) - 1; i++) {
char c = busId[i];
if (c == '.' || c == ':') continue;
if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'F') || (c >= 'a' && c <= 'f')) {
hexStr[hexOffset++] = busId[i];
} else
break;
}
hexStr[hexOffset] = '\0';
*id = strtol(hexStr, NULL, 16);
return mscclppSuccess;
}
// Convert a logical cudaDev index to the NVML device minor number
mscclppResult_t getBusId(int cudaDev, std::string* busId) {
// On most systems, the PCI bus ID comes back as in the 0000:00:00.0
// format. Still need to allocate proper space in case PCI domain goes
// higher.
char busIdChar[] = "00000000:00:00.0";
CUDACHECK(cudaDeviceGetPCIBusId(busIdChar, sizeof(busIdChar), cudaDev));
// we need the hex in lower case format
for (int i = 0; i < sizeof(busIdChar); i++) {
busIdChar[i] = std::tolower(busIdChar[i]);
}
*busId = busIdChar;
return mscclppSuccess;
}
mscclppResult_t getDeviceNumaNode(int cudaDev, int* numaNode) {
std::string busId;
MSCCLPPCHECK(getBusId(cudaDev, &busId));
std::string pci_str = "/sys/bus/pci/devices/" + busId + "/numa_node";
FILE* file = fopen(pci_str.c_str(), "r");
if (file == NULL) {
WARN("Could not open %s to detect the NUMA node for device %d", pci_str.c_str(), cudaDev);
return mscclppSystemError;
}
int ret = fscanf(file, "%d", numaNode);
if (ret != 1) {
WARN("Could not read NUMA node for device %d", cudaDev);
return mscclppSystemError;
}
fclose(file);
return mscclppSuccess;
}
mscclppResult_t getHostName(char* hostname, int maxlen, const char delim) {
if (gethostname(hostname, maxlen) != 0) {
strncpy(hostname, "unknown", maxlen);
return mscclppSystemError;
}
int i = 0;
while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++;
hostname[i] = '\0';
return mscclppSuccess;
}
uint64_t getHash(const char* string, int n) {
// Based on DJB2a, result = result * 33 ^ char
uint64_t result = 5381;
for (int c = 0; c < n; c++) {
result = ((result << 5) + result) ^ string[c];
}
return result;
}
/* Generate a hash of the unique identifying string for this host
* that will be unique for both bare-metal and container instances
* Equivalent of a hash of;
*
* $(hostname)$(cat /proc/sys/kernel/random/boot_id)
*
* This string can be overridden by using the MSCCLPP_HOSTID env var.
*/
#define HOSTID_FILE "/proc/sys/kernel/random/boot_id"
uint64_t computeHostHash(void) {
char hostHash[1024];
char* hostId;
// Fall back is the full hostname if something fails
(void)getHostName(hostHash, sizeof(hostHash), '\0');
int offset = strlen(hostHash);
if ((hostId = getenv("MSCCLPP_HOSTID")) != NULL) {
INFO(MSCCLPP_ENV, "MSCCLPP_HOSTID set by environment to %s", hostId);
strncpy(hostHash, hostId, sizeof(hostHash));
} else {
FILE* file = fopen(HOSTID_FILE, "r");
if (file != NULL) {
char* p;
if (fscanf(file, "%ms", &p) == 1) {
strncpy(hostHash + offset, p, sizeof(hostHash) - offset - 1);
free(p);
}
}
fclose(file);
}
// Make sure the string is terminated
hostHash[sizeof(hostHash) - 1] = '\0';
TRACE(MSCCLPP_INIT, "unique hostname '%s'", hostHash);
return getHash(hostHash, strlen(hostHash));
}
uint64_t getHostHash(void) {
thread_local std::unique_ptr<uint64_t> hostHash = std::make_unique<uint64_t>(computeHostHash());
return *hostHash;
}
/* Generate a hash of the unique identifying string for this process
* that will be unique for both bare-metal and container instances
* Equivalent of a hash of;
*
* $$ $(readlink /proc/self/ns/pid)
*/
uint64_t getPidHash(void) {
char pname[1024];
// Start off with our pid ($$)
sprintf(pname, "%ld", (long)getpid());
int plen = strlen(pname);
int len = readlink("/proc/self/ns/pid", pname + plen, sizeof(pname) - 1 - plen);
if (len < 0) len = 0;
pname[plen + len] = '\0';
TRACE(MSCCLPP_INIT, "unique PID '%s'", pname);
return getHash(pname, strlen(pname));
}
int parseStringList(const char* string, struct netIf* ifList, int maxList) {
if (!string) return 0;
const char* ptr = string;
int ifNum = 0;
int ifC = 0;
char c;
do {
c = *ptr;
if (c == ':') {
if (ifC > 0) {
ifList[ifNum].prefix[ifC] = '\0';
ifList[ifNum].port = atoi(ptr + 1);
ifNum++;
ifC = 0;
}
while (c != ',' && c != '\0') c = *(++ptr);
} else if (c == ',' || c == '\0') {
if (ifC > 0) {
ifList[ifNum].prefix[ifC] = '\0';
ifList[ifNum].port = -1;
ifNum++;
ifC = 0;
}
} else {
ifList[ifNum].prefix[ifC] = c;
ifC++;
}
ptr++;
} while (ifNum < maxList && c);
return ifNum;
}
static bool matchIf(const char* string, const char* ref, bool matchExact) {
// Make sure to include '\0' in the exact case
int matchLen = matchExact ? strlen(string) + 1 : strlen(ref);
return strncmp(string, ref, matchLen) == 0;
}
static bool matchPort(const int port1, const int port2) {
if (port1 == -1) return true;
if (port2 == -1) return true;
if (port1 == port2) return true;
return false;
}
bool matchIfList(const char* string, int port, struct netIf* ifList, int listSize, bool matchExact) {
// Make an exception for the case where no user list is defined
if (listSize == 0) return true;
for (int i = 0; i < listSize; i++) {
if (matchIf(string, ifList[i].prefix, matchExact) && matchPort(port, ifList[i].port)) {
return true;
}
}
return false;
}
mscclppResult_t numaBind(int node) {
int totalNumNumaNodes = numa_num_configured_nodes();
if (node < 0 || node >= totalNumNumaNodes) {
WARN("Invalid NUMA node %d, must be between 0 and %d", node, totalNumNumaNodes);
return mscclppInvalidUsage;
}
nodemask_t mask;
nodemask_zero(&mask);
nodemask_set_compat(&mask, node);
numa_bind_compat(&mask);
return mscclppSuccess;
}
mscclppResult_t getNumaState(mscclppNumaState* state) {
mscclppNumaState state_ = numa_get_run_node_mask();
if (state_ == NULL) {
WARN("Failed to get NUMA node mask of the running process");
return mscclppSystemError;
}
*state = state_;
return mscclppSuccess;
}
mscclppResult_t setNumaState(mscclppNumaState state) {
if (state == NULL) {
WARN("Invalid NUMA state");
return mscclppInvalidUsage;
}
numa_bind(state);
return mscclppSuccess;
}
mscclppTime_t getClock() { return std::chrono::steady_clock::now(); }
int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end) {
return std::chrono::duration_cast<std::chrono::seconds>(end - start).count();
}

174
src/utils_internal.cc Normal file
View File

@@ -0,0 +1,174 @@
#include "utils_internal.hpp"
#include <cuda_runtime.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory>
#include <string>
#include "checks_internal.hpp"
namespace {
constexpr char HOSTID_FILE[32] = "/proc/sys/kernel/random/boot_id";
bool matchIf(const char* string, const char* ref, bool matchExact) {
// Make sure to include '\0' in the exact case
int matchLen = matchExact ? strlen(string) + 1 : strlen(ref);
return strncmp(string, ref, matchLen) == 0;
}
bool matchPort(const int port1, const int port2) {
if (port1 == -1) return true;
if (port2 == -1) return true;
if (port1 == port2) return true;
return false;
}
} // namespace
namespace mscclpp {
std::string int64ToBusId(int64_t id) {
char busId[20];
std::sprintf(busId, "%04lx:%02lx:%02lx.%01lx", (id) >> 20, (id & 0xff000) >> 12, (id & 0xff0) >> 4, (id & 0xf));
return std::string(busId);
}
int64_t busIdToInt64(const std::string busId) {
char hexStr[17]; // Longest possible int64 hex string + null terminator.
int hexOffset = 0;
for (int i = 0; hexOffset < sizeof(hexStr) - 1 && i < busId.length(); ++i) {
char c = busId[i];
if (c == '.' || c == ':') continue;
if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'F') || (c >= 'a' && c <= 'f')) {
hexStr[hexOffset++] = busId[i];
} else
break;
}
hexStr[hexOffset] = '\0';
return std::strtol(hexStr, NULL, 16);
}
uint64_t getHash(const char* string, int n) {
// Based on DJB2a, result = result * 33 ^ char
uint64_t result = 5381;
for (int c = 0; c < n; c++) {
result = ((result << 5) + result) ^ string[c];
}
return result;
}
/* Generate a hash of the unique identifying string for this host
* that will be unique for both bare-metal and container instances
* Equivalent of a hash of;
*
* $(hostname)$(cat /proc/sys/kernel/random/boot_id)
*
* This string can be overridden by using the MSCCLPP_HOSTID env var.
*/
uint64_t computeHostHash(void) {
char hostHash[1024];
char* hostId;
// Fall back is the full hostname if something fails
std::string hostName = getHostName(sizeof(hostHash), '\0');
strncpy(hostHash, hostName.c_str(), sizeof(hostHash));
int offset = strlen(hostHash);
if ((hostId = getenv("MSCCLPP_HOSTID")) != NULL) {
INFO(MSCCLPP_ENV, "MSCCLPP_HOSTID set by environment to %s", hostId);
strncpy(hostHash, hostId, sizeof(hostHash));
} else {
FILE* file = fopen(HOSTID_FILE, "r");
if (file != nullptr) {
char* p;
if (fscanf(file, "%ms", &p) == 1) {
strncpy(hostHash + offset, p, sizeof(hostHash) - offset - 1);
free(p);
}
}
fclose(file);
}
// Make sure the string is terminated
hostHash[sizeof(hostHash) - 1] = '\0';
TRACE(MSCCLPP_INIT, "unique hostname '%s'", hostHash);
return getHash(hostHash, strlen(hostHash));
}
uint64_t getHostHash(void) {
thread_local std::unique_ptr<uint64_t> hostHash = std::make_unique<uint64_t>(computeHostHash());
return *hostHash;
}
/* Generate a hash of the unique identifying string for this process
* that will be unique for both bare-metal and container instances
* Equivalent of a hash of;
*
* $$ $(readlink /proc/self/ns/pid)
*/
uint64_t getPidHash(void) {
char pname[1024];
// Start off with our pid ($$)
sprintf(pname, "%ld", (long)getpid());
int plen = strlen(pname);
int len = readlink("/proc/self/ns/pid", pname + plen, sizeof(pname) - 1 - plen);
if (len < 0) len = 0;
pname[plen + len] = '\0';
TRACE(MSCCLPP_INIT, "unique PID '%s'", pname);
return getHash(pname, strlen(pname));
}
int parseStringList(const char* string, netIf* ifList, int maxList) {
if (!string) return 0;
const char* ptr = string;
int ifNum = 0;
int ifC = 0;
char c;
do {
c = *ptr;
if (c == ':') {
if (ifC > 0) {
ifList[ifNum].prefix[ifC] = '\0';
ifList[ifNum].port = atoi(ptr + 1);
ifNum++;
ifC = 0;
}
while (c != ',' && c != '\0') c = *(++ptr);
} else if (c == ',' || c == '\0') {
if (ifC > 0) {
ifList[ifNum].prefix[ifC] = '\0';
ifList[ifNum].port = -1;
ifNum++;
ifC = 0;
}
} else {
ifList[ifNum].prefix[ifC] = c;
ifC++;
}
ptr++;
} while (ifNum < maxList && c);
return ifNum;
}
bool matchIfList(const char* string, int port, netIf* ifList, int listSize, bool matchExact) {
// Make an exception for the case where no user list is defined
if (listSize == 0) return true;
for (int i = 0; i < listSize; i++) {
if (matchIf(string, ifList[i].prefix, matchExact) && matchPort(port, ifList[i].port)) {
return true;
}
}
return false;
}
TimePoint getClock() { return std::chrono::steady_clock::now(); }
int64_t elapsedClock(TimePoint start, TimePoint end) {
return std::chrono::duration_cast<std::chrono::seconds>(end - start).count();
}
} // namespace mscclpp

View File

@@ -1,46 +1,45 @@
#include <mscclpp/core.hpp>
#include <mscclpp/channel.hpp>
#include <mscclpp/core.hpp>
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
#include "mpi.h"
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <algorithm>
#include <cassert>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <unistd.h>
#include <unordered_map>
static int nranksPerNode = 8;
// Propagate errors up
#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \
return res; \
} \
#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \
return res; \
} \
} while (0)
// Check CUDA RT calls
#define CUDACHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
#define CUDACHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while (false)
// Measure current time in second.
static double getTime(void)
{
static double getTime(void) {
struct timespec tspec;
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
printf("clock_gettime failed\n");
@@ -52,28 +51,23 @@ static double getTime(void)
__constant__ mscclpp::channel::SimpleDeviceChannel constDevChans[16];
__device__ void allgather0(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int remoteRank,
size_t nelemsPerGPU)
{
size_t nelemsPerGPU) {
// this allgather is really simple and implemented as an alltoall
// this thread's role is a sender role
// put your data asynchronously
if ((threadIdx.x % 32) == 0)
devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int));
if ((threadIdx.x % 32) == 0) devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int));
// make sure everyone is put their data before some thread randomly blocks everyone else in signal
__syncthreads();
// push with flag and sync to make sure the data is received
if ((threadIdx.x % 32) == 0)
devChan.flush();
if ((threadIdx.x % 32) == 0) devChan.flush();
// this thread's role is a receiver role. wait on the semaphore to make sure the data is ready
if ((threadIdx.x % 32) == 0)
devChan.wait();
if ((threadIdx.x % 32) == 0) devChan.wait();
}
__device__ void localAllGather(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size,
int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size)
{
int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) {
// this allgather algorithm works as follows:
// Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode
// and waits for data from GPU rank (i-1) % nranksPerNode
@@ -83,31 +77,26 @@ __device__ void localAllGather(mscclpp::channel::SimpleDeviceChannel devChan, in
for (int i = 1; i < nranksPerNode; i++) {
if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) {
// put your data to GPU (rank+i) % nranksPerNode and signal in one call
if ((threadIdx.x % 32) == 0)
devChan.putWithSignal(offset, size);
if ((threadIdx.x % 32) == 0) devChan.putWithSignal(offset, size);
}
// wait for the data from GPU (rank-i) % nranksPerNode to arrive
if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) {
if ((threadIdx.x % 32) == 0)
devChan.wait();
if ((threadIdx.x % 32) == 0) devChan.wait();
}
asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory");
}
}
__device__ void allgather1(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int nranksPerNode,
int remoteRank, size_t nelemsPerGPU)
{
int remoteRank, size_t nelemsPerGPU) {
localAllGather(devChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
nelemsPerGPU * sizeof(int));
if (remoteRank / nranksPerNode == rank / nranksPerNode)
if ((threadIdx.x % 32) == 0)
devChan.flush();
if ((threadIdx.x % 32) == 0) devChan.flush();
}
__device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int nranksPerNode,
int remoteRank, size_t nelemsPerGPU)
{
int remoteRank, size_t nelemsPerGPU) {
// this allgather is a pipelined and hierarchical one and only works for two nodes
// it is implemented as follows:
// Step 1: each node does a local allgather and concurrently,
@@ -132,8 +121,7 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int ra
if ((threadIdx.x % 32) == 0)
devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int),
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
if ((threadIdx.x % 32) == 0)
devChan.wait();
if ((threadIdx.x % 32) == 0) devChan.wait();
}
__syncthreads();
@@ -152,8 +140,7 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int ra
if ((threadIdx.x % 32) == 0)
devChan.putWithSignal((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
nelemsPerGPU / pipelineSize * sizeof(int));
if ((threadIdx.x % 32) == 0)
devChan.wait();
if ((threadIdx.x % 32) == 0) devChan.wait();
}
__syncthreads();
@@ -167,13 +154,11 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int ra
}
if (remoteRank / nranksPerNode == rank / nranksPerNode || remoteRank % nranksPerNode == rank % nranksPerNode) {
if ((threadIdx.x % 32) == 0)
devChan.flush();
if ((threadIdx.x % 32) == 0) devChan.flush();
}
}
__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel)
{
__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel) {
// find the mapping between remoteRank and devChans
int warpId = threadIdx.x / 32;
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
@@ -188,18 +173,11 @@ __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelem
allgather2(devChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
}
int rankToLocalRank(int rank)
{
return rank % nranksPerNode;
}
int rankToLocalRank(int rank) { return rank % nranksPerNode; }
int rankToNode(int rank)
{
return rank / nranksPerNode;
}
int rankToNode(int rank) { return rank / nranksPerNode; }
void print_usage(const char* prog)
{
void print_usage(const char* prog) {
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
printf("usage: %s IP:PORT [rank nranks]\n", prog);
#else
@@ -208,8 +186,7 @@ void print_usage(const char* prog)
}
void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h,
int** data_d)
{
int** data_d) {
CUDACHECK(cudaMalloc(data_d, dataSize));
CUDACHECK(cudaMemset(*data_d, 0, dataSize));
@@ -226,8 +203,7 @@ void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSiz
}
void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& comm,
mscclpp::channel::DeviceChannelService& channelService, int* data_d, size_t dataSize)
{
mscclpp::channel::DeviceChannelService& channelService, int* data_d, size_t dataSize) {
int thisNode = rankToNode(rank);
int cudaNum = rankToLocalRank(rank);
std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum);
@@ -237,8 +213,7 @@ void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& co
std::vector<mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> remoteMemories;
for (int r = 0; r < world_size; ++r) {
if (r == rank)
continue;
if (r == rank) continue;
mscclpp::Transport transport;
if (rankToNode(r) == thisNode) {
transport = mscclpp::Transport::CudaIpc;
@@ -267,8 +242,7 @@ void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& co
sizeof(mscclpp::channel::SimpleDeviceChannel) * devChannels.size()));
}
void printUsage(const char* prog, bool isMpi)
{
void printUsage(const char* prog, bool isMpi) {
if (isMpi) {
std::string st = "you are using MPI for this test\n";
st += "two possilbe usages are:\n";
@@ -284,8 +258,7 @@ void printUsage(const char* prog, bool isMpi)
}
}
std::unordered_map<std::string, std::string> parseArgs(int argc, const char* argv[], bool isMpi)
{
std::unordered_map<std::string, std::string> parseArgs(int argc, const char* argv[], bool isMpi) {
std::unordered_map<std::string, std::string> options;
for (int i = 1; i < argc; i++) {
@@ -356,8 +329,7 @@ std::unordered_map<std::string, std::string> parseArgs(int argc, const char* arg
return options;
}
int main(int argc, const char* argv[])
{
int main(int argc, const char* argv[]) {
bool isMpi = false;
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
isMpi = true;
@@ -415,27 +387,22 @@ int main(int argc, const char* argv[])
size_t nelemsPerGPU = dataSize / sizeof(int) / world_size;
try {
if (rank == 0)
printf("Initializing MSCCL++\n");
if (rank == 0) printf("Initializing MSCCL++\n");
auto bootstrapper = std::make_shared<mscclpp::Bootstrap>(rank, world_size);
bootstrapper->initialize(ip_port);
mscclpp::Communicator comm(bootstrapper);
mscclpp::channel::DeviceChannelService channelService(comm);
if (rank == 0)
printf("Initializing data for allgather test\n");
if (rank == 0) printf("Initializing data for allgather test\n");
initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d);
if (rank == 0)
printf("Setting up the connection in MSCCL++\n");
if (rank == 0) printf("Setting up the connection in MSCCL++\n");
setupMscclppConnections(rank, world_size, comm, channelService, data_d, dataSize);
if (rank == 0)
printf("Launching MSCCL++ proxy threads\n");
if (rank == 0) printf("Launching MSCCL++ proxy threads\n");
channelService.startProxy();
if (rank == 0)
printf("Testing the correctness of AllGather implementation\n");
if (rank == 0) printf("Testing the correctness of AllGather implementation\n");
cudaStream_t stream;
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
CUDACHECK(cudaDeviceSynchronize());
@@ -453,13 +420,11 @@ int main(int argc, const char* argv[])
int tmp[16];
// A simple barrier
bootstrapper->allGather(tmp, sizeof(int));
if (rank == 0)
printf("Successfully checked the correctness\n");
if (rank == 0) printf("Successfully checked the correctness\n");
// Perf test
int iterwithoutcudagraph = 10;
if (rank == 0)
printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph);
if (rank == 0) printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph);
CUDACHECK(cudaStreamSynchronize(stream));
bootstrapper->allGather(tmp, sizeof(int));
for (int i = 0; i < iterwithoutcudagraph; ++i) {
@@ -470,8 +435,7 @@ int main(int argc, const char* argv[])
// cudaGraph Capture
int cudagraphiter = 10;
if (rank == 0)
printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
if (rank == 0) printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
@@ -510,8 +474,7 @@ int main(int argc, const char* argv[])
(double)(dataSize) / 1e9 / (time_in_us / 1e6));
bootstrapper->allGather(tmp, sizeof(int));
if (rank == 0)
printf("Stopping MSCCL++ proxy threads\n");
if (rank == 0) printf("Stopping MSCCL++ proxy threads\n");
channelService.stopProxy();
} catch (std::exception& e) {

View File

@@ -1,18 +1,19 @@
#include <mscclpp/checks.hpp>
#include <mscclpp/core.hpp>
#include <mscclpp/epoch.hpp>
#include <mscclpp/fifo.hpp>
#include <mscclpp/proxy.hpp>
#include <mscclpp/epoch.hpp>
#include <mscclpp/checks.hpp>
#include <utils.h>
#include <numa.hpp>
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
#include "mpi.h"
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#include <iostream>
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <unistd.h>
#include <iostream>
#include <string>
#include <unordered_map>
int nranksPerNode;
@@ -22,18 +23,17 @@ int world_size;
// Propagate errors up
// Check CUDA RT calls
#define CUCHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
#define CUCHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while (false)
// Measure current time in second.
static double getTime(void)
{
static double getTime(void) {
struct timespec tspec;
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
printf("clock_gettime failed\n");
@@ -42,38 +42,28 @@ static double getTime(void)
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
}
__global__ void kernel(int r, int nranks, mscclpp::DeviceProxyFifo fifo, mscclpp::DeviceEpoch::DeviceHandle* handles, int handleIndex)
{
__global__ void kernel(int r, int nranks, mscclpp::DeviceProxyFifo fifo, mscclpp::DeviceEpoch::DeviceHandle* handles,
int handleIndex) {
int tid = threadIdx.x;
if (tid != r)
handles[tid].epochIncrement();
if (tid != r) handles[tid].epochIncrement();
__syncthreads();
// uint64_t tail;
if (tid == 0){
if (tid == 0) {
mscclpp::ProxyTrigger trigger;
trigger.fst = handleIndex;
fifo.push(trigger);
// tail = fifo.push(trigger);
}
if (tid != r)
handles[tid].wait();
if (tid != r) handles[tid].wait();
// if (tid == 0)
// while(*(volatile uint64_t*)fifo.tailReplica < tail) {};
}
int rankToLocalRank(int rank)
{
return rank % nranksPerNode;
}
int rankToLocalRank(int rank) { return rank % nranksPerNode; }
int rankToNode(int rank)
{
return rank / nranksPerNode;
}
int rankToNode(int rank) { return rank / nranksPerNode; }
void print_usage(const char* prog)
{
void print_usage(const char* prog) {
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
printf("usage: %s IP:PORT [rank nranks]\n", prog);
#else
@@ -82,8 +72,7 @@ void print_usage(const char* prog)
}
void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h,
int** data_d)
{
int** data_d) {
CUCHECK(cudaMalloc(data_d, dataSize));
CUCHECK(cudaMemset(*data_d, 0, dataSize));
@@ -100,7 +89,7 @@ void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSiz
}
class MyProxyService {
private:
private:
int deviceNumaNode_;
mscclpp::Proxy proxy_;
std::vector<mscclpp::RegisteredMemory> remoteMemories_;
@@ -109,13 +98,17 @@ private:
std::vector<std::shared_ptr<mscclpp::DeviceEpoch>> deviceEpochs1_;
std::vector<std::shared_ptr<mscclpp::DeviceEpoch>> deviceEpochs2_;
std::vector<std::shared_ptr<mscclpp::Connection>> connections_;
int dataSize_;
public:
MyProxyService(mscclpp::Communicator& comm, int* data_d, int dataSize) : remoteMemories_(world_size), connections_(world_size), dataSize_(dataSize),
proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) {
int dataSize_;
public:
MyProxyService(mscclpp::Communicator& comm, int* data_d, int dataSize)
: remoteMemories_(world_size),
connections_(world_size),
dataSize_(dataSize),
proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) {
int cudaDevice;
CUCHECK(cudaGetDevice(&cudaDevice));
getDeviceNumaNode(cudaDevice, &deviceNumaNode_);
deviceNumaNode_ = mscclpp::getDeviceNumaNode(cudaDevice);
int thisNode = rankToNode(rank);
int cudaNum = rankToLocalRank(rank);
@@ -123,10 +116,9 @@ public:
mscclpp::Transport ibTransport = mscclpp::getIBTransportByDeviceName(ibDevStr);
std::vector<mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> remoteMemoriesFuture(world_size);
localMemory_ = comm.registerMemory(data_d, dataSize, mscclpp::Transport::CudaIpc | ibTransport);
for (int r = 0; r < world_size; ++r) {
if (r == rank){
if (r == rank) {
hostEpochs_.emplace_back(nullptr);
deviceEpochs1_.emplace_back(nullptr);
deviceEpochs2_.emplace_back(nullptr);
@@ -148,23 +140,23 @@ public:
deviceEpochs1_.emplace_back(std::make_shared<mscclpp::DeviceEpoch>(comm, connections_[r]));
deviceEpochs2_.emplace_back(std::make_shared<mscclpp::DeviceEpoch>(comm, connections_[r]));
comm.sendMemoryOnSetup(localMemory_, r, 0);
remoteMemoriesFuture[r] = comm.recvMemoryOnSetup(r, 0);
}
comm.setup();
for (int r = 0; r < world_size; ++r) {
if (r == rank){
if (r == rank) {
continue;
}
remoteMemories_[r] = remoteMemoriesFuture[r].get();
}
}
}
void bindThread() {
if (deviceNumaNode_ >= 0) {
numaBind(deviceNumaNode_);
mscclpp::numaBind(deviceNumaNode_);
}
}
@@ -174,45 +166,34 @@ public:
int dataSizePerRank = dataSize_ / world_size;
for (int r = 1; r < world_size; ++r) {
int nghr = (rank + r) % world_size;
connections_[nghr]->write(remoteMemories_[nghr], rank*dataSizePerRank, localMemory_, rank*dataSizePerRank, dataSizePerRank);
connections_[nghr]->write(remoteMemories_[nghr], rank * dataSizePerRank, localMemory_, rank * dataSizePerRank,
dataSizePerRank);
if (triggerRaw.fst == 1)
deviceEpochs1_[nghr]->signal();
else
deviceEpochs2_[nghr]->signal();
if ((flusher % 64) == 0 && mscclpp::AllIBTransports.has(connections_[nghr]->transport())){
if ((flusher % 64) == 0 && mscclpp::AllIBTransports.has(connections_[nghr]->transport())) {
// if we are using IB transport, we need a flush every once in a while
connections_[nghr]->flush();
}
}
flusher++;
}
return mscclpp::ProxyHandlerResult::FlushFifoTailAndContinue;
}
void start(){
proxy_.start();
}
void start() { proxy_.start(); }
void stop(){
proxy_.stop();
}
void stop() { proxy_.stop(); }
mscclpp::HostProxyFifo& fifo(){
return proxy_.fifo();
}
mscclpp::HostProxyFifo& fifo() { return proxy_.fifo(); }
mscclpp::DeviceEpoch::DeviceHandle getDeviceHandle1(int r){
return deviceEpochs1_[r]->deviceHandle();
}
mscclpp::DeviceEpoch::DeviceHandle getDeviceHandle1(int r) { return deviceEpochs1_[r]->deviceHandle(); }
mscclpp::DeviceEpoch::DeviceHandle getDeviceHandle2(int r){
return deviceEpochs2_[r]->deviceHandle();
}
mscclpp::DeviceEpoch::DeviceHandle getDeviceHandle2(int r) { return deviceEpochs2_[r]->deviceHandle(); }
};
std::unordered_map<std::string, std::string> parseArgs(int argc, char* argv[])
{
std::unordered_map<std::string, std::string> parseArgs(int argc, char* argv[]) {
std::unordered_map<std::string, std::string> options;
for (int i = 1; i < argc; i++) {
@@ -234,9 +215,7 @@ std::unordered_map<std::string, std::string> parseArgs(int argc, char* argv[])
return options;
}
int main(int argc, char* argv[])
{
int main(int argc, char* argv[]) {
// sleep(10);
MPI_Init(&argc, &argv);
auto parsedArgs = parseArgs(argc, argv);
@@ -251,16 +230,13 @@ int main(int argc, char* argv[])
nranksPerNode = shmrank;
MPI_Comm_free(&shmcomm);
int cudaNum = rankToLocalRank(rank);
CUCHECK(cudaSetDevice(cudaNum));
if (rank == 0)
printf("Initializing MSCCL++\n");
if (rank == 0) printf("Initializing MSCCL++\n");
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(rank, world_size);
mscclpp::UniqueId uniqueId;
if (rank == 0)
uniqueId = bootstrap->createUniqueId();
if (rank == 0) uniqueId = bootstrap->createUniqueId();
MPI_Bcast(&uniqueId, sizeof(uniqueId), MPI_BYTE, 0, MPI_COMM_WORLD);
bootstrap->initialize(uniqueId);
mscclpp::Communicator comm(bootstrap);
@@ -273,22 +249,18 @@ int main(int argc, char* argv[])
}
size_t nelemsPerGPU = dataSize / sizeof(int) / world_size;
if (rank == 0)
printf("Initializing data for allgather test\n");
if (rank == 0) printf("Initializing data for allgather test\n");
initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d);
if (rank == 0)
printf("Setting up the connection in MSCCL++\n");
if (rank == 0) printf("Setting up the connection in MSCCL++\n");
MyProxyService proxyService(comm, data_d, dataSize);
// setupProxyService(comm, proxyService, data_d, dataSize);
if (rank == 0)
printf("Launching MSCCL++ proxy threads\n");
if (rank == 0) printf("Launching MSCCL++ proxy threads\n");
proxyService.start();
mscclpp::DeviceProxyFifo fifo = proxyService.fifo().deviceFifo();
if (rank == 0)
printf("Testing the correctness of AllGather implementation\n");
if (rank == 0) printf("Testing the correctness of AllGather implementation\n");
cudaStream_t stream;
CUCHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
mscclpp::DeviceEpoch::DeviceHandle* deviceHandles1;
@@ -296,18 +268,18 @@ int main(int argc, char* argv[])
CUCHECK(cudaMalloc(&deviceHandles1, sizeof(mscclpp::DeviceEpoch::DeviceHandle) * world_size));
for (int i = 0; i < world_size; ++i) {
if (i == rank)
continue;
if (i == rank) continue;
auto handle = proxyService.getDeviceHandle1(i);
CUCHECK(cudaMemcpy(&deviceHandles1[i], &handle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), cudaMemcpyHostToDevice));
CUCHECK(
cudaMemcpy(&deviceHandles1[i], &handle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), cudaMemcpyHostToDevice));
}
CUCHECK(cudaMalloc(&deviceHandles2, sizeof(mscclpp::DeviceEpoch::DeviceHandle) * world_size));
for (int i = 0; i < world_size; ++i) {
if (i == rank)
continue;
if (i == rank) continue;
auto handle = proxyService.getDeviceHandle2(i);
CUCHECK(cudaMemcpy(&deviceHandles2[i], &handle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), cudaMemcpyHostToDevice));
CUCHECK(
cudaMemcpy(&deviceHandles2[i], &handle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), cudaMemcpyHostToDevice));
}
kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles1, 1);
@@ -324,13 +296,11 @@ int main(int argc, char* argv[])
}
bootstrap->barrier();
if (rank == 0)
printf("Correctness test passed!\n");
if (rank == 0) printf("Correctness test passed!\n");
double t0, t1, ms, time_in_us;
int iterwithoutcudagraph = 10;
if (rank == 0)
printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph);
if (rank == 0) printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph);
CUCHECK(cudaStreamSynchronize(stream));
bootstrap->barrier();
t0 = getTime();
@@ -344,12 +314,11 @@ int main(int argc, char* argv[])
ms = (t1 - t0) * 1000.0;
time_in_us = ms * 1000. / (float)iterwithoutcudagraph / 2;
printf("No Graph %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us,
(double)(dataSize) / 1e9 / (time_in_us / 1e6));
(double)(dataSize) / 1e9 / (time_in_us / 1e6));
// cudaGraph Capture
int cudagraphiter = 10;
if (rank == 0)
printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
if (rank == 0) printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
@@ -363,7 +332,7 @@ int main(int argc, char* argv[])
int cudagraphwarmup = 10;
if (rank == 0)
printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup,
cudagraphiter);
cudagraphiter);
for (int i = 0; i < cudagraphwarmup; ++i) {
cudaGraphLaunch(instance, stream);
}
@@ -373,7 +342,7 @@ int main(int argc, char* argv[])
int cudagraphlaunch = 10;
if (rank == 0)
printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch,
cudagraphiter);
cudagraphiter);
bootstrap->barrier();
t0 = getTime();
for (int i = 0; i < cudagraphlaunch; ++i) {
@@ -386,11 +355,10 @@ int main(int argc, char* argv[])
time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter / 2;
if (rank == 0)
printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us,
(double)(dataSize) / 1e9 / (time_in_us / 1e6));
(double)(dataSize) / 1e9 / (time_in_us / 1e6));
bootstrap->barrier();
if (rank == 0)
printf("Stopping MSCCL++ proxy threads\n");
if (rank == 0) printf("Stopping MSCCL++ proxy threads\n");
proxyService.stop();
MSCCLPP_CUDATHROW(cudaFree(data_d));

View File

@@ -13,84 +13,57 @@ void* scratch = nullptr;
void* sendRecvData = nullptr;
cuda::barrier<cuda::thread_scope_device>* barrier = nullptr;
struct Chunk
{
struct Chunk {
size_t offset;
size_t size;
};
inline int getSendTag(int rank, int peer)
{
return rank < peer ? 0 : 1;
}
inline int getSendTag(int rank, int peer) { return rank < peer ? 0 : 1; }
inline int getRecvTag(int rank, int peer)
{
return rank < peer ? 1 : 0;
}
inline int getRecvTag(int rank, int peer) { return rank < peer ? 1 : 0; }
__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount)
{
__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount) {
size_t remainder = dataCount % numChunks;
size_t smallChunkSize = dataCount / numChunks;
size_t largeChunkSize = smallChunkSize + 1;
size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0;
size_t numSmallChunks = chunkCount - numLargeChunks;
size_t offset =
(remainder - numLargeChunks) * largeChunkSize + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize;
size_t offset = (remainder - numLargeChunks) * largeChunkSize +
(chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize;
return Chunk{offset, numLargeChunks * largeChunkSize + numSmallChunks * smallChunkSize};
}
__host__ __device__ int peerIdx(int peerRank, int rank)
{
return peerRank < rank ? peerRank : peerRank - 1;
}
__host__ __device__ int peerIdx(int peerRank, int rank) { return peerRank < rank ? peerRank : peerRank - 1; }
__host__ __device__ int peerRank(int peerIdx, int rank)
{
return peerIdx < rank ? peerIdx : peerIdx + 1;
}
__host__ __device__ int peerRank(int peerIdx, int rank) { return peerIdx < rank ? peerIdx : peerIdx + 1; }
__host__ __device__ int phase1SendConnIdx(int peerRank, int rank)
{
return peerIdx(peerRank, rank) * 3;
}
__host__ __device__ int phase1SendConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3; }
__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank)
{
return peerIdx(peerRank, rank) * 3 + 1;
}
__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3 + 1; }
__host__ __device__ int phase2ConnIdx(int peerRank, int rank)
{
return peerIdx(peerRank, rank) * 3 + 2;
}
__host__ __device__ int phase2ConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3 + 2; }
__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size)
{
__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size) {
if (threadIdx.x == 0) {
conn.putWithSignalAndFlush(dstOffset, srcOffset, size);
}
__syncthreads();
}
__device__ void recv(mscclppDevConn_t& conn)
{
__device__ void recv(mscclppDevConn_t& conn) {
if (threadIdx.x == 0) {
conn.wait();
}
__syncthreads();
}
__device__ void reduceSum(int* dst, int* src, size_t size)
{
__device__ void reduceSum(int* dst, int* src, size_t size) {
for (int i = threadIdx.x; i < size; i += blockDim.x) {
dst[i] += src[i];
}
}
__global__ void initData(int* data, size_t size, int rank)
{
__global__ void initData(int* data, size_t size, int rank) {
for (int i = threadIdx.x; i < size; i += blockDim.x) {
data[i] = rank;
}
@@ -98,8 +71,7 @@ __global__ void initData(int* data, size_t size, int rank)
__global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t scratchDataCount,
mscclppDevConn_t* conns, void* scratch, void* sendRecvData,
cuda::barrier<cuda::thread_scope_device>* barrier)
{
cuda::barrier<cuda::thread_scope_device>* barrier) {
int idx = blockIdx.x;
int peer = peerRank(idx, rank);
mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(peer, rank)];
@@ -116,8 +88,7 @@ __global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t
send(phase1SendConn, toPeerChunk.offset * sizeof(int), dstOffset * sizeof(int), toPeerChunk.size * sizeof(int));
recv(phase1RecvConn);
if (threadIdx.x == 0)
barrier->arrive_and_wait();
if (threadIdx.x == 0) barrier->arrive_and_wait();
__syncthreads();
// Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer
@@ -135,8 +106,7 @@ __global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t
reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size);
}
if (threadIdx.x == 0)
barrier->arrive_and_wait();
if (threadIdx.x == 0) barrier->arrive_and_wait();
__syncthreads();
// 2nd communication phase: send the now reduced data between the user buffers
@@ -147,8 +117,7 @@ __global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t
}
void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
size_t* recvInplaceOffset, size_t count, int nranks)
{
size_t* recvInplaceOffset, size_t count, int nranks) {
size_t base = (count / ALIGN) * ALIGN;
*sendcount = base;
*recvcount = base;
@@ -157,14 +126,12 @@ void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* par
*paramcount = base;
}
void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks)
{
void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
AllReduceGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t AllReduceInitData(struct testArgs* args, int in_place)
{
testResult_t AllReduceInitData(struct testArgs* args, int in_place) {
size_t recvcount = args->expectedBytes / sizeof(int);
CUDACHECK(cudaSetDevice(args->gpuNum));
@@ -182,8 +149,7 @@ testResult_t AllReduceInitData(struct testArgs* args, int in_place)
return testSuccess;
}
void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks)
{
void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * typesize) / 1.0E9 / sec;
*algBw = baseBw;
@@ -192,8 +158,7 @@ void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, doubl
}
testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t nBytes, mscclppComm_t comm,
cudaStream_t stream, int kernelNum)
{
cudaStream_t stream, int kernelNum) {
int worldSize = comm->nRanks;
int nPeers = worldSize - 1;
int dataCount = nBytes / sizeof(int);
@@ -207,8 +172,7 @@ testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode,
struct testColl allReduceTest = {"AllReduce", AllReduceGetCollByteCount, defaultInitColl, AllReduceInitData,
AllReduceGetBw, AllReduceRunColl};
testResult_t AllReduceSetupMscclppConnections(struct testArgs* args)
{
testResult_t AllReduceSetupMscclppConnections(struct testArgs* args) {
int rank = args->proc, worldSize = args->totalProcs;
size_t bufferSize = args->maxbytes;
Chunk chunk = getChunk(bufferSize / sizeof(int), args->totalProcs, rank, 1);
@@ -224,7 +188,7 @@ testResult_t AllReduceSetupMscclppConnections(struct testArgs* args)
MSCCLPPCHECK(mscclppConnect(args->comm, peer, sendTag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr));
MSCCLPPCHECK(mscclppConnect(args->comm, peer, recvTag, scratch, scratchBytes, mscclppTransportP2P, nullptr));
MSCCLPPCHECK(
mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr));
mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr));
}
}
MSCCLPPCHECK(mscclppConnectionSetup(args->comm));
@@ -232,8 +196,7 @@ testResult_t AllReduceSetupMscclppConnections(struct testArgs* args)
return testSuccess;
}
testResult_t AllReduceTeardownMscclppConnections()
{
testResult_t AllReduceTeardownMscclppConnections() {
if (scratch != nullptr) {
CUDACHECK(cudaFree(scratch));
scratch = nullptr;
@@ -241,15 +204,14 @@ testResult_t AllReduceTeardownMscclppConnections()
return testSuccess;
}
testResult_t AllReduceRunTest(struct testArgs* args)
{
testResult_t AllReduceRunTest(struct testArgs* args) {
args->collTest = &allReduceTest;
sendRecvData = args->recvbuff;
CUDACHECK(cudaMalloc(&barrier, sizeof(cuda::barrier<cuda::thread_scope_device>)));
cuda::barrier<cuda::thread_scope_device> initBarrier(args->totalProcs - 1);
CUDACHECK(
cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier<cuda::thread_scope_device>), cudaMemcpyHostToDevice));
cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier<cuda::thread_scope_device>), cudaMemcpyHostToDevice));
int nPeers = args->totalProcs - 1;
int rank = args->proc;
std::vector<mscclppDevConn_t> hostConns(nPeers * 3, mscclppDevConn_t());

View File

@@ -1,116 +0,0 @@
#include "mscclpp.h"
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
#include <mpi.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <unistd.h>
#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \
return res; \
} \
} while (0);
void print_usage(const char* prog)
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
std::string st = "you are using MPI for this test\n";
st += "tow possilbe usages are:\n";
st += "> " + std::string(prog) + "\n";
st += "or\n";
st += "> " + std::string(prog) + " ip:port\n";
printf("%s", st.c_str());
#else
std::string st = "you are NOT using MPI for this test\n";
st += "the only possible usage:\n";
st += "> " + std::string(prog) + " ip:port rank world_size\n";
printf("%s", st.c_str());
#endif
}
void myLogHandler(const char* msg)
{
printf("myLogger: %s", msg);
}
int main(int argc, const char* argv[])
{
if (argc >= 2 && (std::string(argv[1]) == "-h" || std::string(argv[1]) == "--help")) {
print_usage(argv[0]);
return 0;
}
int rank, world_size;
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc > 2) {
print_usage(argv[0]);
return -1;
}
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
const char* ip_port;
if (argc == 2)
ip_port = argv[1];
else
ip_port = NULL;
#else
if (argc != 4) {
print_usage(argv[0]);
return -1;
}
const char* ip_port = argv[1];
rank = atoi(argv[2]);
world_size = atoi(argv[3]);
#endif
MSCCLPPCHECK(mscclppSetLogHandler(myLogHandler));
mscclppComm_t comm;
if (ip_port) {
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank));
} else {
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
mscclppUniqueId id;
if (rank == 0)
MSCCLPPCHECK(mscclppGetUniqueId(&id));
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
MSCCLPPCHECK(mscclppCommInitRankFromId(&comm, world_size, id, rank));
#else
fprintf(stderr, "this should have not been possible!\n");
return -1;
#endif
}
// allocate some test buffer
int* buf = (int*)calloc(world_size, sizeof(int));
if (buf == nullptr) {
printf("calloc failed\n");
return -1;
}
// each rank sets one element in the array
buf[rank] = rank;
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, buf, sizeof(int)));
// check the correctness of all elements in the output of AllGather
for (int i = 0; i < world_size; ++i) {
if (buf[i] != i) {
printf("wrong data: %d, expected %d\n", buf[i], i);
return -1;
}
}
MSCCLPPCHECK(mscclppCommDestroy(comm));
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
MPI_Finalize();
#endif
printf("Rank %d Succeeded\n", rank);
return 0;
}

View File

@@ -1,34 +1,28 @@
#include <mscclpp/core.hpp>
#include <mpi.h>
#include <cassert>
#include <iostream>
#include <memory>
#include <mpi.h>
#include <mscclpp/core.hpp>
void test_allgather(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap)
{
void test_allgather(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
std::vector<int> tmp(bootstrap->getNranks(), 0);
tmp[bootstrap->getRank()] = bootstrap->getRank() + 1;
bootstrap->allGather(tmp.data(), sizeof(int));
for (int i = 0; i < bootstrap->getNranks(); i++) {
assert(tmp[i] == i + 1);
}
if (bootstrap->getRank() == 0)
std::cout << "AllGather test passed!" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "AllGather test passed!" << std::endl;
}
void test_barrier(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap)
{
void test_barrier(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "Barrier test passed!" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "Barrier test passed!" << std::endl;
}
void test_sendrecv(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap)
{
void test_sendrecv(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
for (int i = 0; i < bootstrap->getNranks(); i++) {
if (bootstrap->getRank() == i)
continue;
if (bootstrap->getRank() == i) continue;
int msg1 = (bootstrap->getRank() + 1) * 3;
int msg2 = (bootstrap->getRank() + 1) * 3 + 1;
int msg3 = (bootstrap->getRank() + 1) * 3 + 2;
@@ -38,8 +32,7 @@ void test_sendrecv(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap)
}
for (int i = 0; i < bootstrap->getNranks(); i++) {
if (bootstrap->getRank() == i)
continue;
if (bootstrap->getRank() == i) continue;
int msg1 = 0;
int msg2 = 0;
int msg3 = 0;
@@ -51,102 +44,79 @@ void test_sendrecv(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap)
assert(msg2 == (i + 1) * 3 + 1);
assert(msg3 == (i + 1) * 3 + 2);
}
if (bootstrap->getRank() == 0)
std::cout << "Send/Recv test passed!" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "Send/Recv test passed!" << std::endl;
}
void test_all(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap)
{
void test_all(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
test_allgather(bootstrap);
test_barrier(bootstrap);
test_sendrecv(bootstrap);
}
void test_mscclpp_bootstrap_with_id(int rank, int worldSize)
{
void test_mscclpp_bootstrap_with_id(int rank, int worldSize) {
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(rank, worldSize);
mscclpp::UniqueId id;
if (bootstrap->getRank() == 0)
id = bootstrap->createUniqueId();
if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId();
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
bootstrap->initialize(id);
test_all(bootstrap);
if (bootstrap->getRank() == 0)
std::cout << "--- MSCCLPP::Bootstrap test with unique id passed! ---" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Bootstrap test with unique id passed! ---" << std::endl;
}
void test_mscclpp_bootstrap_with_ip_port_pair(int rank, int worldSize, char* ipPortPiar)
{
void test_mscclpp_bootstrap_with_ip_port_pair(int rank, int worldSize, char* ipPortPair) {
std::shared_ptr<mscclpp::Bootstrap> bootstrap(new mscclpp::Bootstrap(rank, worldSize));
bootstrap->initialize(ipPortPiar);
bootstrap->initialize(ipPortPair);
test_all(bootstrap);
if (bootstrap->getRank() == 0)
std::cout << "--- MSCCLPP::Bootstrap test with ip_port pair passed! ---" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Bootstrap test with ip_port pair passed! ---" << std::endl;
}
class MPIBootstrap : public mscclpp::BaseBootstrap
{
public:
MPIBootstrap() : BaseBootstrap()
{
}
int getRank() override
{
class MPIBootstrap : public mscclpp::BaseBootstrap {
public:
MPIBootstrap() : BaseBootstrap() {}
int getRank() override {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
int getNranks() override
{
int getNranks() override {
int worldSize;
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
return worldSize;
}
void allGather(void* sendbuf, int size) override
{
void allGather(void* sendbuf, int size) override {
MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sendbuf, size, MPI_BYTE, MPI_COMM_WORLD);
}
void barrier() override
{
MPI_Barrier(MPI_COMM_WORLD);
}
void send(void* sendbuf, int size, int dest, int tag) override
{
void barrier() override { MPI_Barrier(MPI_COMM_WORLD); }
void send(void* sendbuf, int size, int dest, int tag) override {
MPI_Send(sendbuf, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
}
void recv(void* recvbuf, int size, int source, int tag) override
{
void recv(void* recvbuf, int size, int source, int tag) override {
MPI_Recv(recvbuf, size, MPI_BYTE, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
};
void test_mpi_bootstrap()
{
void test_mpi_bootstrap() {
std::shared_ptr<mscclpp::BaseBootstrap> bootstrap(new MPIBootstrap());
test_all(bootstrap);
if (bootstrap->getRank() == 0)
std::cout << "--- MPI Bootstrap test passed! ---" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "--- MPI Bootstrap test passed! ---" << std::endl;
}
int main(int argc, char** argv)
{
int main(int argc, char** argv) {
int rank, worldSize;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
if (argc > 2) {
if (rank == 0)
std::cout << "Usage: " << argv[0] << " [ip:port]" << std::endl;
if (rank == 0) std::cout << "Usage: " << argv[0] << " [ip:port]" << std::endl;
MPI_Finalize();
return 0;
}
test_mscclpp_bootstrap_with_id(rank, worldSize);
if (argc == 2)
test_mscclpp_bootstrap_with_ip_port_pair(rank, worldSize, argv[1]);
if (argc == 2) test_mscclpp_bootstrap_with_ip_port_pair(rank, worldSize, argv[1]);
test_mpi_bootstrap();
MPI_Finalize();
return 0;
}
}

View File

@@ -1,23 +1,22 @@
#include <mscclpp/epoch.hpp>
#include <mscclpp/core.hpp>
#include <cuda_runtime.h>
#include <mpi.h>
#include <cassert>
#include <cuda_runtime.h>
#include <iostream>
#include <memory>
#include <mpi.h>
#include <mscclpp/core.hpp>
#include <mscclpp/epoch.hpp>
#include <unordered_map>
#define CUDATHROW(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
throw std::runtime_error(std::string("Cuda failure '") + cudaGetErrorString(err) + "'"); \
} \
#define CUDATHROW(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
throw std::runtime_error(std::string("Cuda failure '") + cudaGetErrorString(err) + "'"); \
} \
} while (false)
mscclpp::Transport findIb(int localRank)
{
mscclpp::Transport findIb(int localRank) {
mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2,
mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5,
mscclpp::Transport::IB6, mscclpp::Transport::IB7};
@@ -27,8 +26,7 @@ mscclpp::Transport findIb(int localRank)
void register_all_memories(mscclpp::Communicator& communicator, int rank, int worldSize, void* devicePtr,
size_t deviceBufferSize, mscclpp::Transport myIbDevice,
mscclpp::RegisteredMemory& localMemory,
std::unordered_map<int, mscclpp::RegisteredMemory>& remoteMemory)
{
std::unordered_map<int, mscclpp::RegisteredMemory>& remoteMemory) {
localMemory = communicator.registerMemory(devicePtr, deviceBufferSize, mscclpp::Transport::CudaIpc | myIbDevice);
std::unordered_map<int, mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> futureRemoteMemory;
for (int i = 0; i < worldSize; i++) {
@@ -47,8 +45,7 @@ void register_all_memories(mscclpp::Communicator& communicator, int rank, int wo
void make_connections(mscclpp::Communicator& communicator, int rank, int worldSize, int nRanksPerNode,
mscclpp::Transport myIbDevice,
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections)
{
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections) {
for (int i = 0; i < worldSize; i++) {
if (i != rank) {
if (i / nRanksPerNode == rank / nRanksPerNode) {
@@ -63,8 +60,7 @@ void make_connections(mscclpp::Communicator& communicator, int rank, int worldSi
void write_remote(int rank, int worldSize, std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections,
std::unordered_map<int, mscclpp::RegisteredMemory>& remoteRegisteredMemories,
mscclpp::RegisteredMemory& registeredMemory, int dataCountPerRank)
{
mscclpp::RegisteredMemory& registeredMemory, int dataCountPerRank) {
for (int i = 0; i < worldSize; i++) {
if (i != rank) {
auto& conn = connections.at(i);
@@ -76,8 +72,7 @@ void write_remote(int rank, int worldSize, std::unordered_map<int, std::shared_p
}
}
void device_buffer_init(int rank, int worldSize, int dataCount, std::vector<int*>& devicePtr)
{
void device_buffer_init(int rank, int worldSize, int dataCount, std::vector<int*>& devicePtr) {
for (int n = 0; n < (int)devicePtr.size(); n++) {
std::vector<int> hostBuffer(dataCount, 0);
for (int i = 0; i < dataCount; i++) {
@@ -89,8 +84,7 @@ void device_buffer_init(int rank, int worldSize, int dataCount, std::vector<int*
}
bool test_device_buffer_write_correctness(int rank, int worldSize, int nRanksPerNode, int dataCount,
std::vector<int*>& devicePtr, bool skipLocal = false)
{
std::vector<int*>& devicePtr, bool skipLocal = false) {
for (int n = 0; n < (int)devicePtr.size(); n++) {
std::vector<int> hostBuffer(dataCount, 0);
CUDATHROW(cudaMemcpy(hostBuffer.data(), devicePtr[n], dataCount * sizeof(int), cudaMemcpyDeviceToHost));
@@ -112,16 +106,13 @@ void test_write(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize
std::shared_ptr<mscclpp::BaseBootstrap> bootstrap,
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections,
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>>& remoteMemory,
std::vector<mscclpp::RegisteredMemory>& localMemory, std::vector<int*>& devicePtr, int numBuffers)
{
std::vector<mscclpp::RegisteredMemory>& localMemory, std::vector<int*>& devicePtr, int numBuffers) {
assert((deviceBufferSize / sizeof(int)) % worldSize == 0);
size_t dataCount = deviceBufferSize / sizeof(int);
device_buffer_init(rank, worldSize, dataCount, devicePtr);
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "CUDA memory initialization passed" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl;
for (int n = 0; n < numBuffers; n++) {
write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize);
@@ -145,20 +136,17 @@ void test_write(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize
if (bootstrap->getRank() == 0)
std::cout << "Polling for " << std::to_string(numBuffers) << " buffers passed" << std::endl;
if (bootstrap->getRank() == 0)
std::cout << "--- Testing vanialla writes passed ---" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "--- Testing vanialla writes passed ---" << std::endl;
}
__global__ void increament_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize)
{
__global__ void increament_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) {
int tid = threadIdx.x;
if (tid != rank && tid < worldSize) {
deviceEpochs[tid].epochIncrement();
}
}
__global__ void wait_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize)
{
__global__ void wait_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) {
int tid = threadIdx.x;
if (tid != rank && tid < worldSize) {
deviceEpochs[tid].wait();
@@ -171,9 +159,7 @@ void test_write_with_device_epochs(int rank, int worldSize, int nRanksPerNode, i
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections,
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>>& remoteMemory,
std::vector<mscclpp::RegisteredMemory>& localMemory, std::vector<int*>& devicePtr,
int numBuffers)
{
int numBuffers) {
std::unordered_map<int, std::shared_ptr<mscclpp::DeviceEpoch>> epochs;
for (auto entry : connections) {
auto& conn = entry.second;
@@ -181,16 +167,14 @@ void test_write_with_device_epochs(int rank, int worldSize, int nRanksPerNode, i
}
communicator.setup();
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "Epochs are created" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "Epochs are created" << std::endl;
assert((deviceBufferSize / sizeof(int)) % worldSize == 0);
size_t dataCount = deviceBufferSize / sizeof(int);
device_buffer_init(rank, worldSize, dataCount, devicePtr);
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "CUDA memory initialization passed" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl;
mscclpp::DeviceEpoch::DeviceHandle* deviceEpochHandles;
CUDATHROW(cudaMalloc(&deviceEpochHandles, sizeof(mscclpp::DeviceEpoch::DeviceHandle) * worldSize));
@@ -204,8 +188,7 @@ void test_write_with_device_epochs(int rank, int worldSize, int nRanksPerNode, i
CUDATHROW(cudaDeviceSynchronize());
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "CUDA device epochs are created" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "CUDA device epochs are created" << std::endl;
for (int n = 0; n < numBuffers; n++) {
write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize);
@@ -238,32 +221,26 @@ void test_write_with_host_epochs(int rank, int worldSize, int nRanksPerNode, int
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections,
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>>& remoteMemory,
std::vector<mscclpp::RegisteredMemory>& localMemory, std::vector<int*>& devicePtr,
int numBuffers)
{
int numBuffers) {
std::unordered_map<int, std::shared_ptr<mscclpp::HostEpoch>> epochs;
for (auto entry : connections) {
auto& conn = entry.second;
if (conn->transport() == mscclpp::Transport::CudaIpc)
continue;
if (conn->transport() == mscclpp::Transport::CudaIpc) continue;
epochs.insert({entry.first, std::make_shared<mscclpp::HostEpoch>(communicator, conn)});
}
communicator.setup();
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "Epochs are created" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "Epochs are created" << std::endl;
assert((deviceBufferSize / sizeof(int)) % worldSize == 0);
size_t dataCount = deviceBufferSize / sizeof(int);
device_buffer_init(rank, worldSize, dataCount, devicePtr);
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "CUDA memory initialization passed" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl;
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "Host epochs are created" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "Host epochs are created" << std::endl;
for (int n = 0; n < numBuffers; n++) {
write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize);
@@ -291,25 +268,21 @@ void test_write_with_host_epochs(int rank, int worldSize, int nRanksPerNode, int
<< std::endl;
}
void test_communicator(int rank, int worldSize, int nRanksPerNode)
{
void test_communicator(int rank, int worldSize, int nRanksPerNode) {
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(rank, worldSize);
mscclpp::UniqueId id;
if (bootstrap->getRank() == 0)
id = bootstrap->createUniqueId();
if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId();
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
bootstrap->initialize(id);
mscclpp::Communicator communicator(bootstrap);
if (bootstrap->getRank() == 0)
std::cout << "Communicator initialization passed" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "Communicator initialization passed" << std::endl;
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>> connections;
auto myIbDevice = findIb(rank % nRanksPerNode);
make_connections(communicator, rank, worldSize, nRanksPerNode, myIbDevice, connections);
if (bootstrap->getRank() == 0)
std::cout << "Connection setup passed" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "Connection setup passed" << std::endl;
int numBuffers = 10;
std::vector<int*> devicePtr(numBuffers);
@@ -319,8 +292,7 @@ void test_communicator(int rank, int worldSize, int nRanksPerNode)
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>> remoteMemory(numBuffers);
for (int n = 0; n < numBuffers; n++) {
if (n % 100 == 0)
std::cout << "Registering memory for " << std::to_string(n) << " buffers" << std::endl;
if (n % 100 == 0) std::cout << "Registering memory for " << std::to_string(n) << " buffers" << std::endl;
CUDATHROW(cudaMalloc(&devicePtr[n], deviceBufferSize));
register_all_memories(communicator, rank, worldSize, devicePtr[n], deviceBufferSize, myIbDevice, localMemory[n],
remoteMemory[n]);
@@ -338,16 +310,14 @@ void test_communicator(int rank, int worldSize, int nRanksPerNode)
test_write_with_host_epochs(rank, worldSize, nRanksPerNode, deviceBufferSize, communicator, bootstrap, connections,
remoteMemory, localMemory, devicePtr, numBuffers);
if (bootstrap->getRank() == 0)
std::cout << "--- MSCCLPP::Communicator tests passed! ---" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Communicator tests passed! ---" << std::endl;
for (int n = 0; n < numBuffers; n++) {
CUDATHROW(cudaFree(devicePtr[n]));
}
}
int main(int argc, char** argv)
{
int main(int argc, char** argv) {
int rank, worldSize;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

View File

@@ -1,14 +1,15 @@
#include "checks.h"
#include "ib.hpp"
#include "infiniband/verbs.h"
#include <array>
#include <mscclpp/core.hpp>
#include <mscclpp/cuda_utils.hpp>
#include <array>
#include <string>
#include "checks_internal.hpp"
#include "infiniband/verbs.h"
// Measure current time in second.
static double getTime(void)
{
static double getTime(void) {
struct timespec tspec;
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
printf("clock_gettime failed\n");
@@ -20,8 +21,7 @@ static double getTime(void)
// Example usage:
// Receiver: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 0 0 0
// Sender: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 1 0 0
int main(int argc, const char* argv[])
{
int main(int argc, const char* argv[]) {
if (argc != 5) {
printf("Usage: %s <ip:port> <0(recv)/1(send)> <gpu id> <ib id>\n", argv[0]);
return 1;
@@ -31,7 +31,7 @@ int main(int argc, const char* argv[])
int cudaDevId = atoi(argv[3]);
std::string ibDevName = "mlx5_ib" + std::string(argv[4]);
CUDACHECK(cudaSetDevice(cudaDevId));
MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevId));
int nelem = 1;
auto data = mscclpp::allocUniqueCuda<int>(nelem);
@@ -53,8 +53,7 @@ int main(int argc, const char* argv[])
bootstrap->allGather(mrInfo.data(), sizeof(mscclpp::IbMrInfo));
for (int i = 0; i < bootstrap->getNranks(); ++i) {
if (i == isSend)
continue;
if (i == isSend) continue;
qp->rtr(qpInfo[i]);
qp->rts();
break;

View File

@@ -68,21 +68,16 @@ double parseSize(const char* value) {
return size * units;
}
double allreduceTime(int worldSize, double value, int average)
{
double allreduceTime(int worldSize, double value, int average) {
double accumulator = value;
if (average != 0) {
MPI_Op op = average == 1 ? MPI_SUM
: average == 2 ? MPI_MIN
: average == 3 ? MPI_MAX
: average == 4 ? MPI_SUM
: MPI_Op();
MPI_Op op =
average == 1 ? MPI_SUM : average == 2 ? MPI_MIN : average == 3 ? MPI_MAX : average == 4 ? MPI_SUM : MPI_Op();
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator, 1, MPI_DOUBLE, op, MPI_COMM_WORLD);
}
if (average == 1)
accumulator /= worldSize;
if (average == 1) accumulator /= worldSize;
return accumulator;
}
} // namespace
@@ -94,8 +89,7 @@ BaseTestEngine::BaseTestEngine(bool inPlace) : error_(0), inPlace_(inPlace) {
BaseTestEngine::~BaseTestEngine() { cudaStreamDestroy(stream_); }
void BaseTestColl::setupCollTest(const TestArgs& args, size_t size)
{
void BaseTestColl::setupCollTest(const TestArgs& args, size_t size) {
this->worldSize_ = args.totalRanks;
this->typeSize_ = sizeof(int);
this->setupCollTest(size);
@@ -128,12 +122,9 @@ double BaseTestEngine::benchTime() {
return deltaSec;
}
void BaseTestEngine::barrier() {
this->comm_->bootstrapper()->barrier();
}
void BaseTestEngine::barrier() { this->comm_->bootstrapper()->barrier(); }
void BaseTestEngine::runTest()
{
void BaseTestEngine::runTest() {
// warm-up for large size
this->coll_->setupCollTest(args_, args_.maxBytes);
this->barrier();

View File

@@ -67,7 +67,7 @@ class BaseTestEngine {
virtual ~BaseTestEngine();
virtual void allocateBuffer() = 0;
int getTestErrors() {return error_;}
int getTestErrors() { return error_; }
void setupTest();
void bootstrap(const TestArgs& args);
void runTest();

View File

@@ -1,288 +0,0 @@
#include "mscclpp.h"
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <unistd.h>
#include "common.h"
#define RANKS_PER_NODE 8
#define USE_DMA_FOR_P2P 1
#define TEST_CONN_TYPE 0 // 0: P2P(for local)+IB(for remote), 1: IB-Only
#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \
return res; \
} \
} while (0);
// Check CUDA RT calls
#define CUDACHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while (false)
// Measure current time in second.
static double getTime(void)
{
struct timespec tspec;
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
printf("clock_gettime failed\n");
exit(EXIT_FAILURE);
}
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
}
__constant__ mscclppDevConn_t constDevConns[16];
__global__ void kernel(int rank, int world_size)
{
if (threadIdx.x % 32 != 0)
return;
int warpId = threadIdx.x / 32;
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
mscclppDevConn_t devConn = constDevConns[remoteRank];
volatile int* data = (volatile int*)devConn.localBuff;
volatile uint64_t* localFlag = devConn.localFlag;
#if (USE_DMA_FOR_P2P == 0)
volatile uint64_t* remoteSignalEpochId = devConn.remoteSignalEpochId;
#endif
volatile uint64_t* proxyFlag = devConn.proxyFlag;
uint64_t baseFlag = *localFlag;
if (threadIdx.x == 0) {
// Set my data and flag
*(data + rank) = rank + 1;
}
__syncthreads();
if (threadIdx.x == 0) {
// Do we need a sys fence?
// __threadfence_system();
*localFlag = baseFlag + 1;
}
// get a thread-local trigger and a request for waiting on it
// mscclppTrigger_t trig;
// mscclppRequest_t req = devConn.fifo.getTrigger(&trig);
// Each warp receives data from different ranks
#if (USE_DMA_FOR_P2P == 1)
// Trigger sending data, flag and synchronize after
auto req = devConn.fifo.putWithSignal(rank * sizeof(int), sizeof(int));
// Wait on the request to make sure it is safe to reuse buffer and flag
devConn.fifo.sync(req);
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag) {
}
#else // USE_DMA_FOR_P2P == 0
if (devConn.remoteBuff == NULL) { // IB
// Wait until the proxy have sent my data and flag
devConn.waitTrigger(trig);
// Trigger sending data and flag
devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int));
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag) {
}
} else { // P2P
// Directly read data
volatile int* remoteData = (volatile int*)devConn.remoteBuff;
// Wait until the remote data is set
while (*remoteSignalEpochId == baseFlag) {
}
// Read remote data
data[remoteRank] = remoteData[remoteRank];
}
#endif
}
int rankToLocalRank(int rank)
{
return rank % RANKS_PER_NODE;
}
int rankToNode(int rank)
{
return rank / RANKS_PER_NODE;
}
int cudaNumToIbNum(int cudaNum)
{
int ibNum;
if (cudaNum == 0) {
ibNum = 0;
} else if (cudaNum == 1) {
ibNum = 4;
} else if (cudaNum == 2) {
ibNum = 1;
} else if (cudaNum == 3) {
ibNum = 5;
} else if (cudaNum == 4) {
ibNum = 2;
} else if (cudaNum == 5) {
ibNum = 6;
} else if (cudaNum == 6) {
ibNum = 3;
} else if (cudaNum == 7) {
ibNum = 7;
} else {
printf("Invalid cudaNum: %d\n", cudaNum);
exit(EXIT_FAILURE);
}
return ibNum;
}
int main(int argc, const char* argv[])
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
MPI_Init(NULL, NULL);
#endif
const char* ip_port;
int rank, world_size;
parse_arguments(argc, argv, &ip_port, &rank, &world_size);
int localRank = rankToLocalRank(rank);
int thisNode = rankToNode(rank);
int cudaNum = localRank;
int ibNum = cudaNumToIbNum(cudaNum);
CUDACHECK(cudaSetDevice(cudaNum));
std::string ibDevStr = "mlx5_ib" + std::to_string(ibNum);
mscclppComm_t comm;
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port));
int* data_d;
uint64_t* flag_d;
size_t data_size = sizeof(int) * world_size;
CUDACHECK(cudaMalloc(&data_d, data_size));
CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t)));
CUDACHECK(cudaMemset(data_d, 0, data_size));
CUDACHECK(cudaMemset(flag_d, 0, sizeof(uint64_t)));
for (int r = 0; r < world_size; ++r) {
if (r == rank)
continue;
mscclppTransport_t transportType = mscclppTransportIB;
const char* ibDev = ibDevStr.c_str();
#if (TEST_CONN_TYPE == 0) // P2P+IB
if (rankToNode(r) == thisNode) {
transportType = mscclppTransportP2P;
ibDev = NULL;
}
#endif
// Connect with all other ranks
MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, data_size, flag_d, transportType, ibDev));
}
MSCCLPPCHECK(mscclppConnectionSetup(comm));
MSCCLPPCHECK(mscclppProxyLaunch(comm));
mscclppDevConn_t* devConns;
int nCons;
MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons));
CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * world_size));
cudaStream_t stream;
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size);
CUDACHECK(cudaDeviceSynchronize());
// Read results from GPU
int* buf = (int*)calloc(world_size, sizeof(int));
if (buf == nullptr) {
printf("calloc failed\n");
return -1;
}
CUDACHECK(cudaMemcpy(buf, data_d, sizeof(int) * world_size, cudaMemcpyDeviceToHost));
bool failed = false;
for (int i = 0; i < world_size; ++i) {
if (buf[i] != i + 1) {
printf("rank: %d, wrong data: %d, expected %d\n", rank, buf[i], i + 1);
failed = true;
}
}
if (failed) {
return -1;
}
// Perf test
cudaEvent_t ev_start;
cudaEvent_t ev_end;
CUDACHECK(cudaEventCreate(&ev_start));
CUDACHECK(cudaEventCreate(&ev_end));
// warm up
// int warmupiter = 10;
// for (int i = 0; i < warmupiter; ++i) {
// kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size);
// }
// cudaGraph Capture
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
int cudagraphiter = 100;
for (int i = 0; i < cudagraphiter; ++i) {
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size);
}
cudaStreamEndCapture(stream, &graph);
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
int cudagraphwarmup = 10;
for (int i = 0; i < cudagraphwarmup; ++i) {
cudaGraphLaunch(instance, stream);
}
CUDACHECK(cudaStreamSynchronize(stream));
// measure runtime
// CUDACHECK(cudaEventRecord(ev_start, stream));
double t0 = getTime();
int cudagraphlaunch = 10;
for (int i = 0; i < cudagraphlaunch; ++i) {
// kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size);
cudaGraphLaunch(instance, stream);
}
// CUDACHECK(cudaEventRecord(ev_end, stream));
CUDACHECK(cudaStreamSynchronize(stream));
double t1 = getTime();
float ms = (t1 - t0) * 1000.0;
// CUDACHECK(cudaEventElapsedTime(&ms, ev_start, ev_end));
printf("rank: %d, time: %f us/iter\n", rank, ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter);
MSCCLPPCHECK(mscclppProxyStop(comm));
MSCCLPPCHECK(mscclppCommDestroy(comm));
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc == 2) {
MPI_Finalize();
}
#endif
printf("Succeeded! %d\n", rank);
return 0;
}

View File

@@ -1,5 +1,6 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <mscclpp/core.hpp>
class LocalCommunicatorTest : public ::testing::Test {

View File

@@ -1,4 +1,5 @@
#include <gtest/gtest.h>
#include <mscclpp/cuda_utils.hpp>
TEST(CudaMemoryTest, Shared) {