diff --git a/CMakeLists.txt b/CMakeLists.txt index 01354076..5635433f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,7 +32,7 @@ option(INSTALL_GTEST OFF) FetchContent_MakeAvailable(googletest) include(GoogleTest) -set(CLANG_FORMAT_SOURCE_DIRS include src tests) +set(CLANG_FORMAT_SOURCE_DIRS include src test) include(${PROJECT_SOURCE_DIR}/cmake/AddClangFormatTargets.cmake) add_library(mscclpp SHARED) diff --git a/docs/Doxyfile b/docs/Doxyfile index 2094539d..c992bb71 100644 --- a/docs/Doxyfile +++ b/docs/Doxyfile @@ -829,7 +829,7 @@ WARN_LOGFILE = # spaces. See also FILE_PATTERNS and EXTENSION_MAPPING # Note: If this tag is empty the current directory is searched. -INPUT = ../src/include +INPUT = ../include/mscclpp # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses diff --git a/include/mscclpp/utils.hpp b/include/mscclpp/utils.hpp index 7c2da2e1..3614fa02 100644 --- a/include/mscclpp/utils.hpp +++ b/include/mscclpp/utils.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include namespace mscclpp { @@ -42,7 +43,7 @@ inline std::string getHostName(int maxlen, const char delim) { std::string hostname(maxlen + 1, '\0'); if (gethostname(const_cast(hostname.data()), maxlen) != 0) { std::strncpy(const_cast(hostname.data()), "unknown", maxlen); - throw; + throw Error("gethostname failed", ErrorCode::SystemError); } int i = 0; while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++; diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index dfd22d64..2f764762 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -11,10 +11,9 @@ #include #include "api.h" -#include "checks.h" #include "checks_internal.hpp" #include "socket.h" -#include "utils.h" +#include "utils_internal.hpp" using namespace mscclpp; @@ -115,7 +114,7 @@ UniqueId Bootstrap::Impl::getUniqueId() const { UniqueId Bootstrap::Impl::createUniqueId() { netInit(""); - MSCCLPPTHROW(getRandomData(&uniqueId_.magic, sizeof(uniqueId_.magic))); + getRandomData(&uniqueId_.magic, sizeof(uniqueId_.magic)); std::memcpy(&uniqueId_.addr, &netIfAddr_, sizeof(mscclppSocketAddress)); bootstrapCreateRoot(); return getUniqueId(); diff --git a/src/bootstrap/socket.cc b/src/bootstrap/socket.cc index 568eaa50..19df1e49 100644 --- a/src/bootstrap/socket.cc +++ b/src/bootstrap/socket.cc @@ -13,10 +13,10 @@ #include #include -#include "checks.h" -#include "config.h" +#include "checks_internal.hpp" +#include "config.hpp" #include "debug.h" -#include "utils.h" +#include "utils_internal.hpp" static mscclppResult_t socketProgressOpt(int op, struct mscclppSocket* sock, void* ptr, int size, int* offset, int block, int* closed) { @@ -112,12 +112,12 @@ static int findInterfaces(const char* prefixList, char* names, union mscclppSock #ifdef ENABLE_TRACE char line[SOCKET_NAME_MAXLEN + 1]; #endif - struct netIf userIfs[MAX_IFS]; + struct mscclpp::netIf userIfs[MAX_IFS]; bool searchNot = prefixList && prefixList[0] == '^'; if (searchNot) prefixList++; bool searchExact = prefixList && prefixList[0] == '='; if (searchExact) prefixList++; - int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS); + int nUserIfs = mscclpp::parseStringList(prefixList, userIfs, MAX_IFS); int found = 0; struct ifaddrs *interfaces, *interface; @@ -142,7 +142,7 @@ static int findInterfaces(const char* prefixList, char* names, union mscclppSock } // check against user specified interfaces - if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) { + if (!(mscclpp::matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) { continue; } @@ -264,9 +264,9 @@ mscclppResult_t mscclppSocketGetAddrFromString(union mscclppSocketAddress* ua, c bool ipv6 = ip_port_pair[0] == '['; /* Construct the sockaddress structure */ if (!ipv6) { - struct netIf ni; + struct mscclpp::netIf ni; // parse : string, expect one pair - if (parseStringList(ip_port_pair, &ni, 1) != 1) { + if (mscclpp::parseStringList(ip_port_pair, &ni, 1) != 1) { WARN("Net : No valid : pair found"); return mscclppInvalidArgument; } @@ -422,13 +422,13 @@ mscclppResult_t mscclppSocketGetAddr(struct mscclppSocket* sock, union mscclppSo static mscclppResult_t socketTryAccept(struct mscclppSocket* sock) { static bool timeInitialized = false; - static mscclppTime_t initTime; + static mscclpp::TimePoint initTime; if (!timeInitialized) { timeInitialized = true; - initTime = getClock(); + initTime = mscclpp::getClock(); } - mscclppConfig* config = mscclppConfig::getInstance(); + mscclpp::Config* config = mscclpp::Config::getInstance(); time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig(); socklen_t socklen = sizeof(union mscclppSocketAddress); sock->fd = accept(sock->acceptFd, &sock->addr.sa, &socklen); @@ -439,7 +439,7 @@ static mscclppResult_t socketTryAccept(struct mscclppSocket* sock) { WARN("socketTryAccept: get errno %d that is not EAGAIN or EWOULDBLOCK", errno); timeInitialized = false; return mscclppSystemError; - } else if (elapsedClock(getClock(), initTime) > acceptTimeout) { + } else if (mscclpp::elapsedClock(mscclpp::getClock(), initTime) > acceptTimeout) { WARN("socketTryAccept: exceeded timeout (%ld) sec", acceptTimeout); timeInitialized = false; return mscclppRemoteError; @@ -483,13 +483,13 @@ static mscclppResult_t socketFinalizeAccept(struct mscclppSocket* sock) { static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) { static bool timeInitialized = false; - static mscclppTime_t initTime; + static mscclpp::TimePoint initTime; if (!timeInitialized) { timeInitialized = true; - initTime = getClock(); + initTime = mscclpp::getClock(); } - mscclppConfig* config = mscclppConfig::getInstance(); + mscclpp::Config* config = mscclpp::Config::getInstance(); time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig(); /* blocking/non-blocking connect() is determined by asyncFlag. */ @@ -502,7 +502,7 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) { sock->state = mscclppSocketStateConnectPolling; return mscclppSuccess; } else if (errno == ECONNREFUSED || errno == ETIMEDOUT) { - if (elapsedClock(getClock(), initTime) > acceptTimeout) { + if (mscclpp::elapsedClock(mscclpp::getClock(), initTime) > acceptTimeout) { WARN("socketStartConnect: exceeded timeout (%ld) sec", acceptTimeout); sock->state = mscclppSocketStateError; timeInitialized = false; @@ -522,13 +522,13 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) { static mscclppResult_t socketPollConnect(struct mscclppSocket* sock) { static bool timeInitialized = false; - static mscclppTime_t initTime; + static mscclpp::TimePoint initTime; if (!timeInitialized) { timeInitialized = true; - initTime = getClock(); + initTime = mscclpp::getClock(); } - mscclppConfig* config = mscclppConfig::getInstance(); + mscclpp::Config* config = mscclpp::Config::getInstance(); time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig(); struct pollfd pfd; @@ -552,7 +552,7 @@ static mscclppResult_t socketPollConnect(struct mscclppSocket* sock) { timeInitialized = false; sock->state = mscclppSocketStateConnected; } else if (ret == ECONNREFUSED || ret == ETIMEDOUT) { - if (elapsedClock(getClock(), initTime) > acceptTimeout) { + if (mscclpp::elapsedClock(mscclpp::getClock(), initTime) > acceptTimeout) { WARN("socketPollConnect: exceeded timeout (%ld) sec", acceptTimeout); sock->state = mscclppSocketStateError; return mscclppRemoteError; diff --git a/src/c_style_remnants.cc b/src/c_style_remnants.cc index 98b6273f..79b3cc2f 100644 --- a/src/c_style_remnants.cc +++ b/src/c_style_remnants.cc @@ -1,5 +1,4 @@ #include "api.h" -#include "config.h" #include "debug.h" #include "mscclpp.h" @@ -9,12 +8,6 @@ MSCCLPP_API mscclppResult_t mscclppSetLogHandler(mscclppLogHandler_t handler) { return mscclppDebugSetLogHandler(handler); } -MSCCLPP_API mscclppResult_t mscclppSetBootstrapConnTimeout(int timeout) { - mscclppConfig* config = mscclppConfig::getInstance(); - config->setBootstrapConnectionTimeoutConfig(timeout); - return mscclppSuccess; -} - MSCCLPP_API const char* mscclppGetErrorString(mscclppResult_t code) { switch (code) { case mscclppSuccess: diff --git a/src/channel.cc b/src/channel.cc index c7ffb907..d2ba3c34 100644 --- a/src/channel.cc +++ b/src/channel.cc @@ -3,7 +3,7 @@ #include "api.h" #include "checks_internal.hpp" #include "debug.h" -#include "utils.h" +#include "numa.hpp" namespace mscclpp { namespace channel { @@ -13,12 +13,12 @@ MSCCLPP_API_CPP DeviceChannelService::DeviceChannelService(Communicator& communi proxy_([&](ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) { int cudaDevice; MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice)); - MSCCLPPTHROW(getDeviceNumaNode(cudaDevice, &deviceNumaNode)); + deviceNumaNode = getDeviceNumaNode(cudaDevice); } MSCCLPP_API_CPP void DeviceChannelService::bindThread() { if (deviceNumaNode >= 0) { - MSCCLPPTHROW(numaBind(deviceNumaNode)); + numaBind(deviceNumaNode); INFO(MSCCLPP_INIT, "NUMA node of DeviceChannelService proxy thread is set to %d", deviceNumaNode); } } diff --git a/src/communicator.cc b/src/communicator.cc index 7e0d2414..6df2de4d 100644 --- a/src/communicator.cc +++ b/src/communicator.cc @@ -8,7 +8,7 @@ #include "connection.hpp" #include "debug.h" #include "registered_memory.hpp" -#include "utils.h" +#include "utils_internal.hpp" namespace mscclpp { diff --git a/src/config.cc b/src/config.cc index e4640216..b7aca1a0 100644 --- a/src/config.cc +++ b/src/config.cc @@ -1,9 +1,11 @@ -#include "config.h" +#include "config.hpp" -mscclppConfig mscclppConfig::_instance; +namespace mscclpp { +Config Config::instance_; -mscclppConfig* mscclppConfig::getInstance() { return &_instance; } +Config* Config::getInstance() { return &instance_; } -time_t mscclppConfig::getBootstrapConnectionTimeoutConfig() { return bootstrapConnectionTimeout; } +time_t Config::getBootstrapConnectionTimeoutConfig() { return bootstrapConnectionTimeout; } -void mscclppConfig::setBootstrapConnectionTimeoutConfig(time_t timeout) { bootstrapConnectionTimeout = timeout; } +void Config::setBootstrapConnectionTimeoutConfig(time_t timeout) { bootstrapConnectionTimeout = timeout; } +} // namespace mscclpp diff --git a/src/debug.cc b/src/debug.cc index 9841a4b7..d1fdd2de 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -12,6 +12,8 @@ #include #include +#include + int mscclppDebugLevel = -1; static int pid = -1; static char hostname[1024]; @@ -100,7 +102,7 @@ void mscclppDebugInit() { } // Cache pid and hostname - getHostName(hostname, 1024, '.'); + strncpy(hostname, mscclpp::getHostName(1024, '.').c_str(), 1024); pid = getpid(); /* Parse and expand the MSCCLPP_DEBUG_FILE path and diff --git a/src/ib.cc b/src/ib.cc index 4128648b..c6d57602 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include #include diff --git a/src/include/align.h b/src/include/align.h deleted file mode 100644 index 981d943d..00000000 --- a/src/include/align.h +++ /dev/null @@ -1,41 +0,0 @@ -/************************************************************************* - * Copyright (c) 2015-2019, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#ifndef NCCL_ALIGN_H_ -#define NCCL_ALIGN_H_ - -#define DIVUP(x, y) (((x) + (y)-1) / (y)) - -#define ROUNDUP(x, y) (DIVUP((x), (y)) * (y)) - -#define ALIGN_SIZE(size, align) size = ((size + (align)-1) / (align)) * (align); - -#if !__CUDA_ARCH__ -#ifndef __host__ -#define __host__ -#endif -#ifndef __device__ -#define __device__ -#endif -#endif - -template -__host__ __device__ constexpr Z divUp(X x, Y y) { - return (x + y - 1) / y; -} - -template -__host__ __device__ constexpr Z roundUp(X x, Y y) { - return (x + y - 1) - (x + y - 1) % y; -} - -// assumes second argument is a power of 2 -template -__host__ __device__ constexpr Z alignUp(X x, int a) { - return (x + a - 1) & Z(-a); -} - -#endif diff --git a/src/include/checks.h b/src/include/checks.h deleted file mode 100644 index c877cdea..00000000 --- a/src/include/checks.h +++ /dev/null @@ -1,188 +0,0 @@ -/************************************************************************* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#ifndef MSCCLPP_CHECKS_H_ -#define MSCCLPP_CHECKS_H_ - -#include - -#include "debug.h" - -// Check CUDA RT calls -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - WARN("Cuda failure '%s'", cudaGetErrorString(err)); \ - return mscclppUnhandledCudaError; \ - } \ - } while (false) - -#define CUDACHECKNORET(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - WARN("Cuda failure '%s'", cudaGetErrorString(err)); \ - return; \ - } \ - } while (false) - -#define CUDACHECKGOTO(cmd, res, label) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - WARN("Cuda failure '%s'", cudaGetErrorString(err)); \ - res = mscclppUnhandledCudaError; \ - goto label; \ - } \ - } while (false) - -// Report failure but clear error and continue -#define CUDACHECKIGNORE(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - INFO(MSCCLPP_ALL, "%s:%d Cuda failure '%s'", __FILE__, __LINE__, cudaGetErrorString(err)); \ - (void)cudaGetLastError(); \ - } \ - } while (false) - -#include -// Check system calls -#define SYSCHECK(call, name) \ - do { \ - int retval; \ - SYSCHECKVAL(call, name, retval); \ - } while (false) - -#define SYSCHECKVAL(call, name, retval) \ - do { \ - SYSCHECKSYNC(call, name, retval); \ - if (retval == -1) { \ - WARN("Call to " name " failed : %s", strerror(errno)); \ - return mscclppSystemError; \ - } \ - } while (false) - -#define SYSCHECKSYNC(call, name, retval) \ - do { \ - retval = call; \ - if (retval == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)) { \ - INFO(MSCCLPP_ALL, "Call to " name " returned %s, retrying", strerror(errno)); \ - } else { \ - break; \ - } \ - } while (true) - -#define SYSCHECKGOTO(statement, res, label) \ - do { \ - if ((statement) == -1) { \ - /* Print the back trace*/ \ - res = mscclppSystemError; \ - INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ - goto label; \ - } \ - } while (0); - -#define NEQCHECK(statement, value) \ - do { \ - if ((statement) != value) { \ - /* Print the back trace*/ \ - INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, mscclppSystemError); \ - return mscclppSystemError; \ - } \ - } while (0); - -#define NEQCHECKGOTO(statement, value, res, label) \ - do { \ - if ((statement) != value) { \ - /* Print the back trace*/ \ - res = mscclppSystemError; \ - INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ - goto label; \ - } \ - } while (0); - -#define EQCHECK(statement, value) \ - do { \ - if ((statement) == value) { \ - /* Print the back trace*/ \ - INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, mscclppSystemError); \ - return mscclppSystemError; \ - } \ - } while (0); - -#define EQCHECKGOTO(statement, value, res, label) \ - do { \ - if ((statement) == value) { \ - /* Print the back trace*/ \ - res = mscclppSystemError; \ - INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ - goto label; \ - } \ - } while (0); - -// Propagate errors up -#define MSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ - return res; \ - } \ - } while (0); - -#define MSCCLPPCHECKGOTO(call, res, label) \ - do { \ - res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ - goto label; \ - } \ - } while (0); - -#define MSCCLPPWAIT(call, cond, abortFlagPtr) \ - do { \ - volatile uint32_t* tmpAbortFlag = (abortFlagPtr); \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ - return mscclppInternalError; \ - } \ - if (tmpAbortFlag) NEQCHECK(*tmpAbortFlag, 0); \ - } while (!(cond)); - -#define MSCCLPPWAITGOTO(call, cond, abortFlagPtr, res, label) \ - do { \ - volatile uint32_t* tmpAbortFlag = (abortFlagPtr); \ - res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ - goto label; \ - } \ - if (tmpAbortFlag) NEQCHECKGOTO(*tmpAbortFlag, 0, res, label); \ - } while (!(cond)); - -#define MSCCLPPCHECKTHREAD(a, args) \ - do { \ - if (((args)->ret = (a)) != mscclppSuccess && (args)->ret != mscclppInProgress) { \ - INFO(MSCCLPP_INIT, "%s:%d -> %d [Async thread]", __FILE__, __LINE__, (args)->ret); \ - return args; \ - } \ - } while (0) - -#define CUDACHECKTHREAD(a) \ - do { \ - if ((a) != cudaSuccess) { \ - INFO(MSCCLPP_INIT, "%s:%d -> %d [Async thread]", __FILE__, __LINE__, args->ret); \ - args->ret = mscclppUnhandledCudaError; \ - return args; \ - } \ - } while (0) - -#endif diff --git a/src/include/checks_internal.hpp b/src/include/checks_internal.hpp index 8d5604fd..ea8ed065 100644 --- a/src/include/checks_internal.hpp +++ b/src/include/checks_internal.hpp @@ -3,6 +3,8 @@ #include +#include "debug.h" + #define MSCCLPPTHROW(call) \ do { \ mscclppResult_t res = call; \ @@ -18,4 +20,80 @@ } \ } while (false) +// Check system calls +#define SYSCHECK(call, name) \ + do { \ + int retval; \ + SYSCHECKVAL(call, name, retval); \ + } while (false) + +#define SYSCHECKVAL(call, name, retval) \ + do { \ + SYSCHECKSYNC(call, name, retval); \ + if (retval == -1) { \ + WARN("Call to " name " failed : %s", strerror(errno)); \ + return mscclppSystemError; \ + } \ + } while (false) + +#define SYSCHECKSYNC(call, name, retval) \ + do { \ + retval = call; \ + if (retval == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)) { \ + INFO(MSCCLPP_ALL, "Call to " name " returned %s, retrying", strerror(errno)); \ + } else { \ + break; \ + } \ + } while (true) + +#define SYSCHECKGOTO(statement, res, label) \ + do { \ + if ((statement) == -1) { \ + /* Print the back trace*/ \ + res = mscclppSystemError; \ + INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ + goto label; \ + } \ + } while (0); + +#define EQCHECK(statement, value) \ + do { \ + if ((statement) == value) { \ + /* Print the back trace*/ \ + INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, mscclppSystemError); \ + return mscclppSystemError; \ + } \ + } while (0); + +#define EQCHECKGOTO(statement, value, res, label) \ + do { \ + if ((statement) == value) { \ + /* Print the back trace*/ \ + res = mscclppSystemError; \ + INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ + goto label; \ + } \ + } while (0); + +// Propagate errors up +#define MSCCLPPCHECK(call) \ + do { \ + mscclppResult_t res = call; \ + if (res != mscclppSuccess && res != mscclppInProgress) { \ + /* Print the back trace*/ \ + if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ + return res; \ + } \ + } while (0); + +#define MSCCLPPCHECKGOTO(call, res, label) \ + do { \ + res = call; \ + if (res != mscclppSuccess && res != mscclppInProgress) { \ + /* Print the back trace*/ \ + if (mscclppDebugNoWarn == 0) INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ + goto label; \ + } \ + } while (0); + #endif diff --git a/src/include/communicator.hpp b/src/include/communicator.hpp index 6461eb13..36f1d4a2 100644 --- a/src/include/communicator.hpp +++ b/src/include/communicator.hpp @@ -9,7 +9,6 @@ #include #include "ib.hpp" -#include "mscclpp.h" namespace mscclpp { diff --git a/src/include/config.h b/src/include/config.hpp similarity index 54% rename from src/include/config.h rename to src/include/config.hpp index 60fe3e3e..019ce4d8 100644 --- a/src/include/config.h +++ b/src/include/config.hpp @@ -3,20 +3,22 @@ #include -class mscclppConfig { +namespace mscclpp { +class Config { public: time_t bootstrapConnectionTimeout = 30; - static mscclppConfig* getInstance(); + static Config* getInstance(); time_t getBootstrapConnectionTimeoutConfig(); void setBootstrapConnectionTimeoutConfig(time_t timeout); private: - mscclppConfig() = default; - mscclppConfig(const mscclppConfig&) = delete; - mscclppConfig& operator=(const mscclppConfig&) = delete; + Config() = default; + Config(const Config&) = delete; + Config& operator=(const Config&) = delete; - static mscclppConfig _instance; + static Config instance_; }; +} // namespace mscclpp #endif // end include guard diff --git a/src/include/debug.h b/src/include/debug.h index 64b37297..7131cdc6 100644 --- a/src/include/debug.h +++ b/src/include/debug.h @@ -46,7 +46,6 @@ extern int mscclppDebugLevel; extern uint64_t mscclppDebugMask; extern pthread_mutex_t mscclppDebugLock; extern FILE* mscclppDebugFile; -extern mscclppResult_t getHostName(char* hostname, int maxlen, const char delim); void mscclppDebugDefaultLogHandler(const char* msg); void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char* filefunc, int line, const char* fmt, diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index 082ab059..0ab886a6 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -1,237 +1,12 @@ #ifndef MSCCLPP_H_ #define MSCCLPP_H_ -#define MSCCLPP_MAJOR 0 -#define MSCCLPP_MINOR 1 -#define MSCCLPP_PATCH 0 -#define MSCCLPP_VERSION (MSCCLPP_MAJOR * 10000 + MSCCLPP_MINOR * 100 + MSCCLPP_PATCH) - -// For every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER, a flush of the tail to device memory is triggered. -// As long as MSCCLPP_PROXY_FIFO_SIZE is large enough, having a stale tail is not a problem. -#define MSCCLPP_PROXY_FIFO_SIZE 128 -#define MSCCLPP_PROXY_FIFO_FLUSH_COUNTER 4 - -#include - -#include -// #includa +// TODO: deprecate this file #ifdef __cplusplus extern "C" { #endif -struct alignas(16) mscclppDevConnSignalEpochId { - // every signal(), increaments this and either: - // 1) proxy thread pushes it to the remote peer's localSignalEpochId->proxy - // 2) gpu thread directly writes it to remoteSignalEpochId->device - uint64_t device; - // signal() function triggers the cpu proxy thread to write to it - uint64_t proxy; -}; - -using mscclppBufferHandle_t = uint32_t; - -/*************************************************************************************************************** - * A mscclppDevConn provides a zero-copy connection between two GPUs connected via P2P NVLink or InfiniBand. - * The communication API is one-sided meaning that for every single data transfer, only one side - * needs to execute unlike a two-sided communication stack such as NCCL where both sides - * need to execute a send and a receive instruction, respectively, for every transfer. - * - * A connection is uniquely identified by the (remoteRank, tag) pair at an endpoint. - * The two endpoints register buffers of the same size with the connection. - * - * The endpoints provide the remoteRank, tag, and the buffer when registering a connection with msccppConnect(). - * - * mscllppConnectionSetup() sets up all the registered connections. - * - *************************************************************************************************************** - * A proxy thread running on the CPU is necessary to perform transfers using InfiniBand or the DMA engine. - * The current implementation uses a single proxy thread per context - one IB connection or DMA engine per node. - * Thus multiple threadblocks using different connections might use the same CPU proxy thread. - * - * Before using any of functionality of connections, mscclppProxyLaunch needs to be called to spawn the - * proxy threads. There are currently two types of connections: - * - * P2P via NVLink: the DMA engine can perform the copy between the buffers. DMA engine has higher latency - * but has a higher bandwidth and costs no compute cycles on the GPU. - * - * InfiniBand: the RDMA engine copies the data over MLX devices. - * - *************************************************************************************************************** - * At the runtime, a GPU kernel has access to a mscclppDevConn object that provides the following functions: - * - * put(): [non-blocking] the sender initiates a data transfer to the receiver. - * - * signal(): [non-blocking] the sender signals the receiver that data is ready to be consumed. - * - * flush(): [blocking] the sender waits for all the data transfers to complete - * - * wait(): [blocking] the receiver waits on the signal() to start reading the data. - * - * The sender should not reuse the buffer till the flush() returns. - * The receiver should only access the data after the wait() returns. - * - * putWithSignal(): the sender initiates a data transfer and signals the receiver that data is ready to be consumed. - * This is an optimized version of a put() followed by a signal(). - * - * These functions hide the complexity of syncrhonization between the two GPUs and the CPU proxy thread. - * Example: - * - * // sender GPU - * devConn.put(data1) - * // not OK to write to data1 - * devConn.put(data2) - * // not OK to write to data1, data2 - * devConn.put(data3) // receiver GPU - * // not OK to write to data1, data2, data3 // not OK to read data1, data2, data3 - * devConn.signal() -------------------------------> devConn.wait() - * // not OK to write to data1, data2, data3 // OK to read data1, data2, data3 - * devConn.flush() - * // OK to write to data1, data2, data3 - * - * - * The two endpoint can concurrently use the same connection provided they are writing (puts) on different - * indices in the registered buffer. - **************************************************************************************************************/ -struct mscclppDevConn { -#ifdef __CUDACC__ - __forceinline__ __device__ void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) { - fifo.push(mscclppData, dstDataOffset, srcDataOffset, dataSize); - } - - __forceinline__ __device__ void put(uint64_t dataOffset, uint64_t dataSize) { put(dataOffset, dataOffset, dataSize); } - - __forceinline__ __device__ void signal() { - epochIncrement(); - fifo.push(mscclppFlag, 0, 0, 1); - } - - __forceinline__ __device__ void putWithSignal(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) { - epochIncrement(); - fifo.push(mscclppData | mscclppFlag, dstDataOffset, srcDataOffset, dataSize); - } - - __forceinline__ __device__ void putWithSignal(uint64_t dataOffset, uint64_t dataSize) { - putWithSignal(dataOffset, dataOffset, dataSize); - } - - __forceinline__ __device__ void putWithSignalAndFlush(uint64_t dstDataOffset, uint64_t srcDataOffset, - uint64_t dataSize) { - epochIncrement(); - uint64_t curFifoHead = fifo.push(mscclppData | mscclppFlag | mscclppSync, dstDataOffset, srcDataOffset, dataSize); - while (*(volatile uint64_t*)&fifo.triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 && - *(volatile uint64_t*)fifo.triggerFifoTail <= curFifoHead) - ; - } - - __forceinline__ __device__ void putWithSignalAndFlush(uint64_t dataOffset, uint64_t dataSize) { - putWithSignalAndFlush(dataOffset, dataOffset, dataSize); - } - - __forceinline__ __device__ void flush() { - uint64_t curFifoHead = fifo.push(mscclppSync, 0, 0, 1); - // we need to wait for two conditions to be met to ensure the CPU is done flushing. (1) wait for the tail - // to go pass by curFifoHead (this is safety net) and (2) wait for the work element value to change to 0. - while (*(volatile uint64_t*)&fifo.triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 && - *(volatile uint64_t*)fifo.triggerFifoTail <= curFifoHead) - ; - } - - // Version that uses the SM directly to do the copy, instead of using the proxy thread like the functions above. - __forceinline__ __device__ void putDirect(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize, - uint32_t threadId, uint32_t numThreads) { - uint64_t* src = (uint64_t*)((char*)localBuff + srcDataOffset); - uint64_t* dst = (uint64_t*)((char*)remoteBuff + dstDataOffset); - // assume the memory is aligned to 8 bytes - size_t nElem = - dataSize % sizeof(uint64_t) ? (dataSize + sizeof(uint64_t)) / sizeof(uint64_t) : dataSize / sizeof(uint64_t); - for (size_t i = threadId; i < nElem; i += numThreads) { - dst[i] = src[i]; - } - } - - __forceinline__ __device__ void putDirect(uint64_t dataOffset, uint64_t dataSize, uint32_t threadId, - uint32_t numThreads) { - putDirect(dataOffset, dataOffset, dataSize, threadId, numThreads); - } - - __forceinline__ __device__ void signalDirect() { - // This fence ensures that the writes from a preceding putDirect() are visible on the peer GPU before the - // incremented epoch id is visible. - __threadfence_system(); - epochIncrement(); - *(volatile uint64_t*)&(remoteSignalEpochId->device) = localSignalEpochId->device; - } - - __forceinline__ __device__ void wait() { - (*waitEpochId) += 1; - while (*(volatile uint64_t*)&(localSignalEpochId->proxy) < (*waitEpochId)) - ; - } - - __forceinline__ __device__ void waitDirect() { - (*waitEpochId) += 1; - while (*(volatile uint64_t*)&(localSignalEpochId->device) < (*waitEpochId)) - ; - } - - __forceinline__ __device__ void epochIncrement() { *(volatile uint64_t*)&(localSignalEpochId->device) += 1; } - -#endif // __CUDACC__ - - // this is a concurrent fifo which is multiple threads from the device - // can produce for and the sole proxy thread consumes it. - struct mscclppConcurrentFifo fifo; - - int remoteRank; - int tag; - - // my local buffer - void* localBuff; - - struct mscclppDevConnSignalEpochId* localSignalEpochId; - // used by the signal() function directly from gpu - struct mscclppDevConnSignalEpochId* remoteSignalEpochId; - - // every wait(), increaments this and then the gpu waits for either: - // 1) localSignalEpochId->proxy to be >= this in case of a proxy thread - // 2) remoteSignalEpochId->device to be >= this in case of a gpu thread - uint64_t* waitEpochId; - - // my remote peer's buffer. only non-NULL with gpu's direct access - // gpu can directly write into it - void* remoteBuff; -}; - -// Host interface for mscclppDevCon functionality -struct mscclppHostConn { - virtual ~mscclppHostConn() = default; - virtual void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) = 0; - virtual void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset, - uint64_t dataSize) = 0; - virtual void signal() = 0; - virtual void wait() = 0; - virtual void flush() = 0; -}; - -typedef struct mscclppComm* mscclppComm_t; -typedef struct mscclppDevConn mscclppDevConn_t; -typedef struct mscclppHostConn mscclppHostConn_t; - -#define MSCCLPP_UNIQUE_ID_BYTES 128 -typedef struct { - char internal[MSCCLPP_UNIQUE_ID_BYTES]; -} mscclppUniqueId; - -struct mscclppRegisteredMemoryP2P { - void* remoteBuff; - const void* IbMr; -}; - -struct mscclppRegisteredMemory { - std::vector p2p; -}; - /* Error type */ typedef enum { mscclppSuccess = 0, @@ -245,68 +20,6 @@ typedef enum { mscclppNumResults = 8 } mscclppResult_t; -/* Create a unique ID for communication. Only needs to be called by one process. - * Use with mscclppCommInitRankFromId(). - * All processes need to provide the same ID to mscclppCommInitRankFromId(). - * - * Outputs: - * uniqueId: the unique ID to be created - */ -mscclppResult_t mscclppGetUniqueId(mscclppUniqueId* uniqueId); - -/* Transport Types */ -typedef enum { - mscclppTransportP2P = 0, - mscclppTransportSHM = 1, // TODO(chhwang): not implemented yet - mscclppTransportIB = 2, -} mscclppTransport_t; - -/* Initialize a communicator. nranks processes with rank 0 to nranks-1 need to call this function. - * - * Outputs: - * comm: the communicator to be initialized - * - * Inputs: - * nranks: number of ranks in the communicator - * ipPortPair: a string of the form "ip:port" that represents the address of the root process - * rank: rank of the calling process - */ -mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char* ipPortPair, int rank); - -/* Initialize a communicator from a given mscclppUniqueId. Same as mscclppCommInitRank() except that - * id is provided by the user by calling mscclppGetUniqueId() - * - * Outputs: - * comm: the communicator to be initialized - * - * Inputs: - * nranks: number of ranks in the communicator - * id: the unique ID to be used for communication - * rank: rank of the calling process - */ -mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank); - -/* Ring-based AllGather through the bootstrap socket. - * - * Outputs: - * comm: the communicator - * - * Inputs: - * data: data array to be gathered where `[r*size, (r+1)*size)` is the data for rank `r` - * size: data size per rank - */ -mscclppResult_t mscclppBootstrapAllGather(mscclppComm_t comm, void* data, int size); - -/* A no-op function that is used to synchronize all processes via a bootstrap allgather*/ -mscclppResult_t mscclppBootstrapBarrier(mscclppComm_t comm); - -/* Destroy a communicator. - * - * Inputs: - * comm: the communicator to be destroyed - */ -mscclppResult_t mscclppCommDestroy(mscclppComm_t comm); - /* Return the string for the given error code. * * Output: @@ -317,136 +30,6 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm); */ const char* mscclppGetErrorString(mscclppResult_t result); -/* Connect to a remote rank. This function only prepares metadata for connection. The actual connection - * is made by a following call of mscclppConnectionSetup(). Note that this function is two-way and a connection - * from rank i to remote rank j needs to have a counterpart from rank j to rank i. - * Note that with IB, buffers are registered at a page level and if a buffer is spread through multiple pages - * and do not fully utilize all of them, IB's QP has to register for all involved pages. This potentially has - * security risks if the devConn's accesses are given to a malicious process. - * - * Inputs: - * comm: the communicator - * remoteRank: the rank of the remote process - * tag: the tag of the connection. tag is copied into the corresponding mscclppDevConn_t, which can be - * used to identify the connection inside a GPU kernel. - * localBuff: the local send/receive buffer - * buffSize: the size of the local buffer - * transportType: the type of transport to be used (mscclppTransportP2P or mscclppTransportIB) - * ibDev: the name of the IB device to be used. Expects a null for mscclppTransportP2P. - */ -mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff, uint64_t buffSize, - mscclppTransport_t transportType, const char* ibDev = 0); - -/* Connect to a remote rank. This function only prepares metadata for connection. The actual connection - * is made by a following call of mscclppConnectionSetup(). Note that this function is two-way and a connection - * from rank i to remote rank j needs to have a counterpart from rank j to rank i. - * Note that with IB, buffers are registered at a page level and if a buffer is spread through multiple pages - * and do not fully utilize all of them, IB's QP has to register for all involved pages. This potentially has - * security risks if the devConn's accesses are given to a malicious process. - * - * This version does not register a buffer. Buffers should instead be registered with mscclppRegisterBuffer(). - * - * Inputs: - * comm: the communicator - * remoteRank: the rank of the remote process - * tag: the tag of the connection. tag is copied into the corresponding mscclppDevConn_t, which can be - * used to identify the connection inside a GPU kernel. - * transportType: the type of transport to be used (mscclppTransportP2P or mscclppTransportIB) - * ibDev: the name of the IB device to be used. Expects a null for mscclppTransportP2P. - */ -mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int remoteRank, int tag, - mscclppTransport_t transportType, const char* ibDev = 0); - -/* Register a buffer for use with a connection. - * - * Inputs: - * comm: the communicator - * connIdx: the index of the connection by order of calls to mscclppConnect/mscclppConnectWithoutBuffer - * localBuff: the local send/receive buffer - * buffSize: the size of the local buffer - * - * Outputs: - * handle: a handle to the buffer registration - */ -mscclppResult_t mscclppRegisterBufferForConnection(mscclppComm_t comm, int connIdx, void* localBuff, uint64_t buffSize, - mscclppBufferHandle_t* handle); - -/* Establish all connections declared by mscclppConnect(). This function must be called after all mscclppConnect() - * calls are made. This function ensures that all remote ranks are ready to communicate when it returns. - * - * Inputs: - * comm: the communicator - */ -mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm); - -/* Return an array of mscclppDevConn_t and the number of connections created by mscclppConnectionSetup(). - * The order of connections matches the order of mscclppConnect() calls. - * - * Outputs: - * devConns: the array of mscclppDevConn_t. Each mscclppDevConn_t corresponds to a mscclppConnect() call in the - * order of the calls. - * nConns: the number of connections - * - * Inputs: - * comm: the communicator - */ -mscclppResult_t mscclppGetAllDeviceConnections(mscclppComm_t comm, mscclppDevConn_t** devConns, int* nConns); - -/* Return the mscclppDevConn_t corresponding to a given tag and a remoteRank. - * - * Outputs: - * devConn: the mscclppDevConn_t corresponding to the given tag - * - * Inputs: - * comm: the communicator - * tag: the tag of the connection - * remoteRank: the remoteRank of the connection - */ -mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag, mscclppDevConn_t** devConn); - -/* Launch proxy threads for all connections created by mscclppConnectionSetup(). This function is supposed to be called - * before starting a kernel that uses mscclppDevConn_t. Up to two proxy threads are launched for each (GPU + IB) pair - * (one for P2P NVLink and one for InfiniBand). - * - * Inputs: - * comm: the communicator - */ -mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm); - -/* Stop all proxy threads. - * - * Inputs: - * comm: the communicator - */ -mscclppResult_t mscclppProxyStop(mscclppComm_t comm); - -/* Return the rank of the calling process. - * - * Outputs: - * rank: the rank of the calling process - * - * Inputs: - * comm: the communicator - */ -mscclppResult_t mscclppCommRank(mscclppComm_t comm, int* rank); - -/* Return the number of ranks of the communicator. - * - * Outputs: - * size: the number of ranks of the communicator - * - * Inputs: - * comm: the communicator - */ -mscclppResult_t mscclppCommSize(mscclppComm_t comm, int* size); - -/* Set the timeout for the bootstrap connection. - * - * Inputs: - * timeout: the timeout in seconds - */ -mscclppResult_t mscclppSetBootstrapConnTimeout(int timeout); - /* Log handler type which is a callback function for * however user likes to handle the log messages. Once set, * the logger will just call this function with msg. @@ -467,33 +50,6 @@ void mscclppDefaultLogHandler(const char* msg); */ mscclppResult_t mscclppSetLogHandler(mscclppLogHandler_t handler); -/* Register a buffer for RDMA. - * - * Outputs: - * regMem: the registered memory - * - * Inputs: - * comm: the communicator - * local_memory: the local buffer to be registered - * size: the size of the buffer - */ -mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* local_memory, size_t size, - mscclppRegisteredMemory* regMem); - -/* Write to a registered buffer. - * - * Inputs: - * comm: the communicator - * regMem: the registered memory - * srcBuff: the source buffer - * size: the size of the buffer - * srcOffset: the offset of the source buffer - * dstOffset: the offset of the destination buffer - * stream: the CUDA stream - */ -mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem, void* srcBuff, - size_t size, uint32_t srcOffset, uint32_t dstOffset, int64_t stream); - #ifdef __cplusplus } // end extern "C" #endif diff --git a/src/include/mscclppfifo.h b/src/include/mscclppfifo.h deleted file mode 100644 index 030220dd..00000000 --- a/src/include/mscclppfifo.h +++ /dev/null @@ -1,78 +0,0 @@ -#ifndef MSCCLPPFIFO_H_ -#define MSCCLPPFIFO_H_ - -#include - -#ifdef __cplusplus -extern "C" { -#endif - -typedef enum : uint64_t { mscclppData = 0x1, mscclppFlag = 0x2, mscclppSync = 0x4 } mscclppTriggerType_t; - -#define MSCCLPP_BITS_SIZE 32 -#define MSCCLPP_BITS_OFFSET 32 -#define MSCCLPP_BITS_TYPE 3 -#define MSCCLPP_BITS_CONNID 10 - -// this is the basic structure of each work element in the fifo -// the summation of number of bits must be 128 or less -union alignas(16) mscclppTrigger { - uint64_t value[2]; - struct { - // first 64 bits: value[0] - uint64_t dataSize : MSCCLPP_BITS_SIZE; - uint64_t srcDataOffset : MSCCLPP_BITS_OFFSET; - uint64_t : (64 - MSCCLPP_BITS_SIZE - MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment - // second 64 bits: value[1] - uint64_t dstDataOffset : MSCCLPP_BITS_OFFSET; - uint64_t connId : MSCCLPP_BITS_CONNID; - uint64_t type : MSCCLPP_BITS_TYPE; - uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_CONNID - MSCCLPP_BITS_TYPE); // ensure 64-bit alignment - } fields; -}; - -typedef mscclppTrigger* mscclppTrigger_t; - -/* This is a concurrent fifo where multiple device threads can push mscclppTrigger work elements to - * and a single host proxy thread consumes these work elements. There is a head pointer allocated on device - * which starts with 0 and goes to 2^64-1 which is almost infinity. There are two copies of tail, one - * that is on the deivce (triggerFifoTail) and another that is on host (proxyState->fifoTailHost). - * The host always has the "true" tail and occasionally, pushes it to the copy on the device. - * Therefore, most of the time, the device has a stale version. The invariants are: - * triggerFifoTail <= proxyState->fifoTailHost <= triggerFifoHead. - * push() function increments triggerFifoHead, proxyState->fifoTailHost is updated in proxy.cc:mscclppProxyService - * and it occasionally flushes it to triggerFifoTail via a cudaMemcpyAsync. - * - * Why duplicating the tail is a good idea? The fifo is large engouh and we do not need frequent updates - * for the tail as there is usually enough space for device threads to push their work into. - */ -struct mscclppConcurrentFifo { -#ifdef __CUDACC__ - - __forceinline__ __device__ uint64_t push(uint64_t type, uint64_t dstDataOffset, uint64_t srcDataOffset, - uint64_t dataSize) { - uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->triggerFifoHead, 1); - while (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->triggerFifoTail)) - ; - while (*(volatile uint64_t*)&this->triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0) - ; - uint64_t* valptr = (uint64_t*)&(this->triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE].value); - asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(valptr), - "l"((srcDataOffset << MSCCLPP_BITS_SIZE) + dataSize), - "l"((((type << MSCCLPP_BITS_CONNID) + this->connId) << MSCCLPP_BITS_OFFSET) + dstDataOffset)); - return curFifoHead; - } - -#endif // __CUDACC__ - mscclppTrigger* triggerFifo; // Allocate on host via cudaHostAlloc. This space is used for pushing the workelements - uint64_t* triggerFifoTail; // Allocated on device. proxyState->fifoTailHost is the true tail on host and pused - // occasionally to device - uint64_t* triggerFifoHead; // Allocated on device. Only accessed by device - int connId; -}; - -#ifdef __cplusplus -} // end extern "C" -#endif - -#endif // MSCCLPPFIFO_H_ diff --git a/src/include/numa.hpp b/src/include/numa.hpp new file mode 100644 index 00000000..680d9bae --- /dev/null +++ b/src/include/numa.hpp @@ -0,0 +1,11 @@ +#ifndef MSCCLPP_NUMA_HPP_ +#define MSCCLPP_NUMA_HPP_ + +namespace mscclpp { + +int getDeviceNumaNode(int cudaDev); +void numaBind(int node); + +} // namespace mscclpp + +#endif // MSCCLPP_NUMA_HPP_ diff --git a/src/include/proxy.h b/src/include/proxy.h deleted file mode 100644 index 17e92dfd..00000000 --- a/src/include/proxy.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef MSCCLPP_PROXY_H_ -#define MSCCLPP_PROXY_H_ - -#include -#include - -#include - -#include "comm.h" -#include "mscclpp.h" - -#define MSCCLPP_PROXY_MAX_NUM (MSCCLPP_IB_MAX_DEVS + 1) // One is for a P2P proxy. - -typedef enum { - MSCCLPP_PROXY_RUN_STATE_IDLE = 0, - MSCCLPP_PROXY_RUN_STATE_RUNNING, - MSCCLPP_PROXY_RUN_STATE_EXITING, -} mscclppProxyRunState_t; - -struct mscclppProxyFifo { - mscclppResult_t create(); - mscclppResult_t destroy(); - mscclppResult_t poll(mscclppTrigger* trigger); - mscclppResult_t pop(); - mscclppResult_t flushTail(bool sync = false); - - // fifo cudaHostCalloc'ed that is produced by device and consumed by host - mscclppTrigger* triggerFifo; -#if defined(MSCCLPP_USE_GDRCOPY) - mscclppTrigger* triggerFifoDev; - void* triggerFifoDesc; -#endif - // allocated on the device and only accessed by the device - uint64_t* fifoHead; - - // allocated on the device. Read-only by device, write-only by host - uint64_t* fifoTailDev; -#if defined(MSCCLPP_USE_GDRCOPY) - uint64_t* fifoTailDevHostPtr; - void* fifoTailDesc; -#endif - // allocated on the host. Only accessed by the host. This is a copy of the - // value pointed to by fifoTailDev and the invariant is that - // *fifoTailDev <= fifoTailHost. Meaning that host's copy of tail is - // always ahead of the device's copy and host updates the device's copy - // only when it is needed. Therefore, fifoTailHost is the "true" tail - // and fifoTailDev is a "stale" tail. See proxy.cc to undertand how - // these updates are pushed to the device. - uint64_t fifoTailHost; - - // for transferring fifo tail - cudaStream_t stream; -}; - -struct mscclppProxyState { - mscclppTransport_t transportType; - pthread_t thread; - mscclppProxyRunState_t run; - - int numaNodeToBind; - mscclpp::IbCtx* ibContext; // For IB connection only - cudaStream_t p2pStream; // for P2P DMA engine only - - struct mscclppProxyFifo fifo; -}; - -mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm); -mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm); - -#endif diff --git a/src/include/registered_memory.hpp b/src/include/registered_memory.hpp index be32e25a..4c206110 100644 --- a/src/include/registered_memory.hpp +++ b/src/include/registered_memory.hpp @@ -8,7 +8,6 @@ #include "communicator.hpp" #include "ib.hpp" -#include "mscclpp.h" namespace mscclpp { diff --git a/src/include/utils.h b/src/include/utils.h deleted file mode 100644 index 5dbca982..00000000 --- a/src/include/utils.h +++ /dev/null @@ -1,71 +0,0 @@ -/************************************************************************* - * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#ifndef MSCCLPP_UTILS_H_ -#define MSCCLPP_UTILS_H_ - -#include -#include - -#include - -#include "mscclpp.h" - -// int mscclppCudaCompCap(); - -// PCI Bus ID <-> int64 conversion functions -mscclppResult_t int64ToBusId(int64_t id, char* busId); -mscclppResult_t busIdToInt64(const char* busId, int64_t* id); - -mscclppResult_t getBusId(int cudaDev, int64_t* busId); -mscclppResult_t getDeviceNumaNode(int cudaDev, int* numaNode); - -mscclppResult_t getHostName(char* hostname, int maxlen, const char delim); -uint64_t getHash(const char* string, int n); -uint64_t getHostHash(); -uint64_t getPidHash(); -mscclppResult_t getRandomData(void* buffer, size_t bytes); - -struct netIf { - char prefix[64]; - int port; -}; - -int parseStringList(const char* string, struct netIf* ifList, int maxList); -bool matchIfList(const char* string, int port, struct netIf* ifList, int listSize, bool matchExact); - -static long log2i(long n) { - long l = 0; - while (n >>= 1) l++; - return l; -} - -typedef std::chrono::steady_clock::time_point mscclppTime_t; -mscclppTime_t getClock(); -int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end); - -/* get any bytes of random data from /dev/urandom, return 0 if it succeeds; else - * return -1 */ -inline mscclppResult_t getRandomData(void* buffer, size_t bytes) { - mscclppResult_t ret = mscclppSuccess; - if (bytes > 0) { - const size_t one = 1UL; - FILE* fp = fopen("/dev/urandom", "r"); - if (buffer == NULL || fp == NULL || fread(buffer, bytes, one, fp) != one) ret = mscclppSystemError; - if (fp) fclose(fp); - } - return ret; -} - -mscclppResult_t numaBind(int node); - -typedef struct bitmask* mscclppNumaState; - -mscclppResult_t getNumaState(mscclppNumaState* state); - -mscclppResult_t setNumaState(mscclppNumaState state); - -#endif diff --git a/src/include/utils_internal.hpp b/src/include/utils_internal.hpp new file mode 100644 index 00000000..16e6dfd6 --- /dev/null +++ b/src/include/utils_internal.hpp @@ -0,0 +1,52 @@ +/************************************************************************* + * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#ifndef MSCCLPP_UTILS_INTERNAL_HPP_ +#define MSCCLPP_UTILS_INTERNAL_HPP_ + +#include +#include +#include +#include + +namespace mscclpp { + +// PCI Bus ID <-> int64 conversion functions +std::string int64ToBusId(int64_t id); +int64_t busIdToInt64(const std::string busId); + +uint64_t getHash(const char* string, int n); +uint64_t getHostHash(); +uint64_t getPidHash(); +void getRandomData(void* buffer, size_t bytes); + +struct netIf { + char prefix[64]; + int port; +}; + +int parseStringList(const char* string, struct netIf* ifList, int maxList); +bool matchIfList(const char* string, int port, struct netIf* ifList, int listSize, bool matchExact); + +using TimePoint = std::chrono::steady_clock::time_point; +TimePoint getClock(); +int64_t elapsedClock(TimePoint start, TimePoint end); + +/* get any bytes of random data from /dev/urandom */ +inline void getRandomData(void* buffer, size_t bytes) { + if (bytes > 0) { + const size_t one = 1UL; + FILE* fp = fopen("/dev/urandom", "r"); + if (buffer == NULL || fp == NULL || fread(buffer, bytes, one, fp) != one) { + throw Error("Failed to read random data", ErrorCode::SystemError); + } + if (fp) fclose(fp); + } +} + +} // namespace mscclpp + +#endif diff --git a/src/numa.cc b/src/numa.cc new file mode 100644 index 00000000..d844c2ed --- /dev/null +++ b/src/numa.cc @@ -0,0 +1,51 @@ +#include + +#include +#include +#include + +// Convert a logical cudaDev index to the NVML device minor number +static const std::string getBusId(int cudaDev) { + // On most systems, the PCI bus ID comes back as in the 0000:00:00.0 + // format. Still need to allocate proper space in case PCI domain goes + // higher. + char busIdChar[] = "00000000:00:00.0"; + MSCCLPP_CUDATHROW(cudaDeviceGetPCIBusId(busIdChar, sizeof(busIdChar), cudaDev)); + // we need the hex in lower case format + for (int i = 0; i < sizeof(busIdChar); i++) { + busIdChar[i] = std::tolower(busIdChar[i]); + } + return std::string(busIdChar); +} + +namespace mscclpp { + +int getDeviceNumaNode(int cudaDev) { + std::string busId = getBusId(cudaDev); + std::string file_str = "/sys/bus/pci/devices/" + busId + "/numa_node"; + std::ifstream file(file_str); + int numaNode; + if (file.is_open()) { + if (!(file >> numaNode)) { + throw Error("Failed to read NUMA node from file: " + file_str, ErrorCode::SystemError); + } + } else { + throw Error("Failed to open file: " + file_str, ErrorCode::SystemError); + } + return numaNode; +} + +void numaBind(int node) { + int totalNumNumaNodes = numa_num_configured_nodes(); + if (node < 0 || node >= totalNumNumaNodes) { + throw Error( + "Invalid NUMA node " + std::to_string(node) + ", must be between 0 and " + std::to_string(totalNumNumaNodes), + ErrorCode::InvalidUsage); + } + nodemask_t mask; + nodemask_zero(&mask); + nodemask_set_compat(&mask, node); + numa_bind_compat(&mask); +} + +} // namespace mscclpp diff --git a/src/proxy.cc b/src/proxy.cc index cfffb4be..f7c32dd4 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -5,7 +5,6 @@ #include #include "api.h" -#include "utils.h" namespace mscclpp { diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 66853d99..578af286 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -7,7 +7,7 @@ #include "api.h" #include "checks_internal.hpp" #include "debug.h" -#include "utils.h" +#include "utils_internal.hpp" namespace mscclpp { diff --git a/src/utils.cc b/src/utils.cc deleted file mode 100644 index 60a9efec..00000000 --- a/src/utils.cc +++ /dev/null @@ -1,264 +0,0 @@ -/************************************************************************* - * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#include "utils.h" - -#include -#include -#include -#include - -#include -#include - -#include "checks.h" - -// Get current Compute Capability -// int mscclppCudaCompCap() { -// int cudaDev; -// if (cudaGetDevice(&cudaDev) != cudaSuccess) return 0; -// int ccMajor, ccMinor; -// if (cudaDeviceGetAttribute(&ccMajor, cudaDevAttrComputeCapabilityMajor, cudaDev) != cudaSuccess) return 0; -// if (cudaDeviceGetAttribute(&ccMinor, cudaDevAttrComputeCapabilityMinor, cudaDev) != cudaSuccess) return 0; -// return ccMajor*10+ccMinor; -// } - -mscclppResult_t int64ToBusId(int64_t id, char* busId) { - sprintf(busId, "%04lx:%02lx:%02lx.%01lx", (id) >> 20, (id & 0xff000) >> 12, (id & 0xff0) >> 4, (id & 0xf)); - return mscclppSuccess; -} - -mscclppResult_t busIdToInt64(const char* busId, int64_t* id) { - char hexStr[17]; // Longest possible int64 hex string + null terminator. - int hexOffset = 0; - for (int i = 0; hexOffset < sizeof(hexStr) - 1; i++) { - char c = busId[i]; - if (c == '.' || c == ':') continue; - if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'F') || (c >= 'a' && c <= 'f')) { - hexStr[hexOffset++] = busId[i]; - } else - break; - } - hexStr[hexOffset] = '\0'; - *id = strtol(hexStr, NULL, 16); - return mscclppSuccess; -} - -// Convert a logical cudaDev index to the NVML device minor number -mscclppResult_t getBusId(int cudaDev, std::string* busId) { - // On most systems, the PCI bus ID comes back as in the 0000:00:00.0 - // format. Still need to allocate proper space in case PCI domain goes - // higher. - char busIdChar[] = "00000000:00:00.0"; - CUDACHECK(cudaDeviceGetPCIBusId(busIdChar, sizeof(busIdChar), cudaDev)); - // we need the hex in lower case format - for (int i = 0; i < sizeof(busIdChar); i++) { - busIdChar[i] = std::tolower(busIdChar[i]); - } - *busId = busIdChar; - return mscclppSuccess; -} - -mscclppResult_t getDeviceNumaNode(int cudaDev, int* numaNode) { - std::string busId; - MSCCLPPCHECK(getBusId(cudaDev, &busId)); - - std::string pci_str = "/sys/bus/pci/devices/" + busId + "/numa_node"; - FILE* file = fopen(pci_str.c_str(), "r"); - if (file == NULL) { - WARN("Could not open %s to detect the NUMA node for device %d", pci_str.c_str(), cudaDev); - return mscclppSystemError; - } - int ret = fscanf(file, "%d", numaNode); - if (ret != 1) { - WARN("Could not read NUMA node for device %d", cudaDev); - return mscclppSystemError; - } - fclose(file); - return mscclppSuccess; -} - -mscclppResult_t getHostName(char* hostname, int maxlen, const char delim) { - if (gethostname(hostname, maxlen) != 0) { - strncpy(hostname, "unknown", maxlen); - return mscclppSystemError; - } - int i = 0; - while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++; - hostname[i] = '\0'; - return mscclppSuccess; -} - -uint64_t getHash(const char* string, int n) { - // Based on DJB2a, result = result * 33 ^ char - uint64_t result = 5381; - for (int c = 0; c < n; c++) { - result = ((result << 5) + result) ^ string[c]; - } - return result; -} - -/* Generate a hash of the unique identifying string for this host - * that will be unique for both bare-metal and container instances - * Equivalent of a hash of; - * - * $(hostname)$(cat /proc/sys/kernel/random/boot_id) - * - * This string can be overridden by using the MSCCLPP_HOSTID env var. - */ -#define HOSTID_FILE "/proc/sys/kernel/random/boot_id" -uint64_t computeHostHash(void) { - char hostHash[1024]; - char* hostId; - - // Fall back is the full hostname if something fails - (void)getHostName(hostHash, sizeof(hostHash), '\0'); - int offset = strlen(hostHash); - - if ((hostId = getenv("MSCCLPP_HOSTID")) != NULL) { - INFO(MSCCLPP_ENV, "MSCCLPP_HOSTID set by environment to %s", hostId); - strncpy(hostHash, hostId, sizeof(hostHash)); - } else { - FILE* file = fopen(HOSTID_FILE, "r"); - if (file != NULL) { - char* p; - if (fscanf(file, "%ms", &p) == 1) { - strncpy(hostHash + offset, p, sizeof(hostHash) - offset - 1); - free(p); - } - } - fclose(file); - } - - // Make sure the string is terminated - hostHash[sizeof(hostHash) - 1] = '\0'; - - TRACE(MSCCLPP_INIT, "unique hostname '%s'", hostHash); - - return getHash(hostHash, strlen(hostHash)); -} - -uint64_t getHostHash(void) { - thread_local std::unique_ptr hostHash = std::make_unique(computeHostHash()); - return *hostHash; -} - -/* Generate a hash of the unique identifying string for this process - * that will be unique for both bare-metal and container instances - * Equivalent of a hash of; - * - * $$ $(readlink /proc/self/ns/pid) - */ -uint64_t getPidHash(void) { - char pname[1024]; - // Start off with our pid ($$) - sprintf(pname, "%ld", (long)getpid()); - int plen = strlen(pname); - int len = readlink("/proc/self/ns/pid", pname + plen, sizeof(pname) - 1 - plen); - if (len < 0) len = 0; - - pname[plen + len] = '\0'; - TRACE(MSCCLPP_INIT, "unique PID '%s'", pname); - - return getHash(pname, strlen(pname)); -} - -int parseStringList(const char* string, struct netIf* ifList, int maxList) { - if (!string) return 0; - - const char* ptr = string; - - int ifNum = 0; - int ifC = 0; - char c; - do { - c = *ptr; - if (c == ':') { - if (ifC > 0) { - ifList[ifNum].prefix[ifC] = '\0'; - ifList[ifNum].port = atoi(ptr + 1); - ifNum++; - ifC = 0; - } - while (c != ',' && c != '\0') c = *(++ptr); - } else if (c == ',' || c == '\0') { - if (ifC > 0) { - ifList[ifNum].prefix[ifC] = '\0'; - ifList[ifNum].port = -1; - ifNum++; - ifC = 0; - } - } else { - ifList[ifNum].prefix[ifC] = c; - ifC++; - } - ptr++; - } while (ifNum < maxList && c); - return ifNum; -} - -static bool matchIf(const char* string, const char* ref, bool matchExact) { - // Make sure to include '\0' in the exact case - int matchLen = matchExact ? strlen(string) + 1 : strlen(ref); - return strncmp(string, ref, matchLen) == 0; -} - -static bool matchPort(const int port1, const int port2) { - if (port1 == -1) return true; - if (port2 == -1) return true; - if (port1 == port2) return true; - return false; -} - -bool matchIfList(const char* string, int port, struct netIf* ifList, int listSize, bool matchExact) { - // Make an exception for the case where no user list is defined - if (listSize == 0) return true; - - for (int i = 0; i < listSize; i++) { - if (matchIf(string, ifList[i].prefix, matchExact) && matchPort(port, ifList[i].port)) { - return true; - } - } - return false; -} - -mscclppResult_t numaBind(int node) { - int totalNumNumaNodes = numa_num_configured_nodes(); - if (node < 0 || node >= totalNumNumaNodes) { - WARN("Invalid NUMA node %d, must be between 0 and %d", node, totalNumNumaNodes); - return mscclppInvalidUsage; - } - nodemask_t mask; - nodemask_zero(&mask); - nodemask_set_compat(&mask, node); - numa_bind_compat(&mask); - return mscclppSuccess; -} - -mscclppResult_t getNumaState(mscclppNumaState* state) { - mscclppNumaState state_ = numa_get_run_node_mask(); - if (state_ == NULL) { - WARN("Failed to get NUMA node mask of the running process"); - return mscclppSystemError; - } - *state = state_; - return mscclppSuccess; -} - -mscclppResult_t setNumaState(mscclppNumaState state) { - if (state == NULL) { - WARN("Invalid NUMA state"); - return mscclppInvalidUsage; - } - numa_bind(state); - return mscclppSuccess; -} - -mscclppTime_t getClock() { return std::chrono::steady_clock::now(); } - -int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end) { - return std::chrono::duration_cast(end - start).count(); -} diff --git a/src/utils_internal.cc b/src/utils_internal.cc new file mode 100644 index 00000000..f5190a0a --- /dev/null +++ b/src/utils_internal.cc @@ -0,0 +1,174 @@ +#include "utils_internal.hpp" + +#include +#include +#include + +#include +#include + +#include "checks_internal.hpp" + +namespace { +constexpr char HOSTID_FILE[32] = "/proc/sys/kernel/random/boot_id"; + +bool matchIf(const char* string, const char* ref, bool matchExact) { + // Make sure to include '\0' in the exact case + int matchLen = matchExact ? strlen(string) + 1 : strlen(ref); + return strncmp(string, ref, matchLen) == 0; +} + +bool matchPort(const int port1, const int port2) { + if (port1 == -1) return true; + if (port2 == -1) return true; + if (port1 == port2) return true; + return false; +} +} // namespace + +namespace mscclpp { +std::string int64ToBusId(int64_t id) { + char busId[20]; + std::sprintf(busId, "%04lx:%02lx:%02lx.%01lx", (id) >> 20, (id & 0xff000) >> 12, (id & 0xff0) >> 4, (id & 0xf)); + return std::string(busId); +} + +int64_t busIdToInt64(const std::string busId) { + char hexStr[17]; // Longest possible int64 hex string + null terminator. + int hexOffset = 0; + for (int i = 0; hexOffset < sizeof(hexStr) - 1 && i < busId.length(); ++i) { + char c = busId[i]; + if (c == '.' || c == ':') continue; + if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'F') || (c >= 'a' && c <= 'f')) { + hexStr[hexOffset++] = busId[i]; + } else + break; + } + hexStr[hexOffset] = '\0'; + return std::strtol(hexStr, NULL, 16); +} + +uint64_t getHash(const char* string, int n) { + // Based on DJB2a, result = result * 33 ^ char + uint64_t result = 5381; + for (int c = 0; c < n; c++) { + result = ((result << 5) + result) ^ string[c]; + } + return result; +} + +/* Generate a hash of the unique identifying string for this host + * that will be unique for both bare-metal and container instances + * Equivalent of a hash of; + * + * $(hostname)$(cat /proc/sys/kernel/random/boot_id) + * + * This string can be overridden by using the MSCCLPP_HOSTID env var. + */ +uint64_t computeHostHash(void) { + char hostHash[1024]; + char* hostId; + + // Fall back is the full hostname if something fails + std::string hostName = getHostName(sizeof(hostHash), '\0'); + strncpy(hostHash, hostName.c_str(), sizeof(hostHash)); + int offset = strlen(hostHash); + + if ((hostId = getenv("MSCCLPP_HOSTID")) != NULL) { + INFO(MSCCLPP_ENV, "MSCCLPP_HOSTID set by environment to %s", hostId); + strncpy(hostHash, hostId, sizeof(hostHash)); + } else { + FILE* file = fopen(HOSTID_FILE, "r"); + if (file != nullptr) { + char* p; + if (fscanf(file, "%ms", &p) == 1) { + strncpy(hostHash + offset, p, sizeof(hostHash) - offset - 1); + free(p); + } + } + fclose(file); + } + + // Make sure the string is terminated + hostHash[sizeof(hostHash) - 1] = '\0'; + TRACE(MSCCLPP_INIT, "unique hostname '%s'", hostHash); + return getHash(hostHash, strlen(hostHash)); +} + +uint64_t getHostHash(void) { + thread_local std::unique_ptr hostHash = std::make_unique(computeHostHash()); + return *hostHash; +} + +/* Generate a hash of the unique identifying string for this process + * that will be unique for both bare-metal and container instances + * Equivalent of a hash of; + * + * $$ $(readlink /proc/self/ns/pid) + */ +uint64_t getPidHash(void) { + char pname[1024]; + // Start off with our pid ($$) + sprintf(pname, "%ld", (long)getpid()); + int plen = strlen(pname); + int len = readlink("/proc/self/ns/pid", pname + plen, sizeof(pname) - 1 - plen); + if (len < 0) len = 0; + + pname[plen + len] = '\0'; + TRACE(MSCCLPP_INIT, "unique PID '%s'", pname); + + return getHash(pname, strlen(pname)); +} + +int parseStringList(const char* string, netIf* ifList, int maxList) { + if (!string) return 0; + + const char* ptr = string; + + int ifNum = 0; + int ifC = 0; + char c; + do { + c = *ptr; + if (c == ':') { + if (ifC > 0) { + ifList[ifNum].prefix[ifC] = '\0'; + ifList[ifNum].port = atoi(ptr + 1); + ifNum++; + ifC = 0; + } + while (c != ',' && c != '\0') c = *(++ptr); + } else if (c == ',' || c == '\0') { + if (ifC > 0) { + ifList[ifNum].prefix[ifC] = '\0'; + ifList[ifNum].port = -1; + ifNum++; + ifC = 0; + } + } else { + ifList[ifNum].prefix[ifC] = c; + ifC++; + } + ptr++; + } while (ifNum < maxList && c); + return ifNum; +} + +bool matchIfList(const char* string, int port, netIf* ifList, int listSize, bool matchExact) { + // Make an exception for the case where no user list is defined + if (listSize == 0) return true; + + for (int i = 0; i < listSize; i++) { + if (matchIf(string, ifList[i].prefix, matchExact) && matchPort(port, ifList[i].port)) { + return true; + } + } + return false; +} + +TimePoint getClock() { return std::chrono::steady_clock::now(); } + +int64_t elapsedClock(TimePoint start, TimePoint end) { + return std::chrono::duration_cast(end - start).count(); +} +} // namespace mscclpp diff --git a/test/allgather_test_cpp.cu b/test/allgather_test_cpp.cu index 60652a0f..f8254fc7 100644 --- a/test/allgather_test_cpp.cu +++ b/test/allgather_test_cpp.cu @@ -1,46 +1,45 @@ -#include - #include +#include #ifdef MSCCLPP_USE_MPI_FOR_TESTS #include "mpi.h" -#endif // MSCCLPP_USE_MPI_FOR_TESTS +#endif // MSCCLPP_USE_MPI_FOR_TESTS +#include +#include +#include + #include #include #include -#include -#include #include -#include #include static int nranksPerNode = 8; // Propagate errors up -#define MSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \ - return res; \ - } \ +#define MSCCLPPCHECK(call) \ + do { \ + mscclppResult_t res = call; \ + if (res != mscclppSuccess && res != mscclppInProgress) { \ + /* Print the back trace*/ \ + printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \ + return res; \ + } \ } while (0) // Check CUDA RT calls -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ +#define CUDACHECK(cmd) \ + do { \ + cudaError_t err = cmd; \ + if (err != cudaSuccess) { \ + printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ + exit(EXIT_FAILURE); \ + } \ } while (false) // Measure current time in second. -static double getTime(void) -{ +static double getTime(void) { struct timespec tspec; if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { printf("clock_gettime failed\n"); @@ -52,28 +51,23 @@ static double getTime(void) __constant__ mscclpp::channel::SimpleDeviceChannel constDevChans[16]; __device__ void allgather0(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int remoteRank, - size_t nelemsPerGPU) -{ + size_t nelemsPerGPU) { // this allgather is really simple and implemented as an alltoall // this thread's role is a sender role // put your data asynchronously - if ((threadIdx.x % 32) == 0) - devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); + if ((threadIdx.x % 32) == 0) devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); // make sure everyone is put their data before some thread randomly blocks everyone else in signal __syncthreads(); // push with flag and sync to make sure the data is received - if ((threadIdx.x % 32) == 0) - devChan.flush(); + if ((threadIdx.x % 32) == 0) devChan.flush(); // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready - if ((threadIdx.x % 32) == 0) - devChan.wait(); + if ((threadIdx.x % 32) == 0) devChan.wait(); } __device__ void localAllGather(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, - int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) -{ + int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) { // this allgather algorithm works as follows: // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode // and waits for data from GPU rank (i-1) % nranksPerNode @@ -83,31 +77,26 @@ __device__ void localAllGather(mscclpp::channel::SimpleDeviceChannel devChan, in for (int i = 1; i < nranksPerNode; i++) { if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) { // put your data to GPU (rank+i) % nranksPerNode and signal in one call - if ((threadIdx.x % 32) == 0) - devChan.putWithSignal(offset, size); + if ((threadIdx.x % 32) == 0) devChan.putWithSignal(offset, size); } // wait for the data from GPU (rank-i) % nranksPerNode to arrive if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { - if ((threadIdx.x % 32) == 0) - devChan.wait(); + if ((threadIdx.x % 32) == 0) devChan.wait(); } asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); } } __device__ void allgather1(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int nranksPerNode, - int remoteRank, size_t nelemsPerGPU) -{ + int remoteRank, size_t nelemsPerGPU) { localAllGather(devChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); if (remoteRank / nranksPerNode == rank / nranksPerNode) - if ((threadIdx.x % 32) == 0) - devChan.flush(); + if ((threadIdx.x % 32) == 0) devChan.flush(); } __device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int world_size, int nranksPerNode, - int remoteRank, size_t nelemsPerGPU) -{ + int remoteRank, size_t nelemsPerGPU) { // this allgather is a pipelined and hierarchical one and only works for two nodes // it is implemented as follows: // Step 1: each node does a local allgather and concurrently, @@ -132,8 +121,7 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int ra if ((threadIdx.x % 32) == 0) devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) - devChan.wait(); + if ((threadIdx.x % 32) == 0) devChan.wait(); } __syncthreads(); @@ -152,8 +140,7 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int ra if ((threadIdx.x % 32) == 0) devChan.putWithSignal((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), nelemsPerGPU / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) - devChan.wait(); + if ((threadIdx.x % 32) == 0) devChan.wait(); } __syncthreads(); @@ -167,13 +154,11 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int ra } if (remoteRank / nranksPerNode == rank / nranksPerNode || remoteRank % nranksPerNode == rank % nranksPerNode) { - if ((threadIdx.x % 32) == 0) - devChan.flush(); + if ((threadIdx.x % 32) == 0) devChan.flush(); } } -__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel) -{ +__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel) { // find the mapping between remoteRank and devChans int warpId = threadIdx.x / 32; int remoteRank = (warpId < rank) ? warpId : warpId + 1; @@ -188,18 +173,11 @@ __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelem allgather2(devChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); } -int rankToLocalRank(int rank) -{ - return rank % nranksPerNode; -} +int rankToLocalRank(int rank) { return rank % nranksPerNode; } -int rankToNode(int rank) -{ - return rank / nranksPerNode; -} +int rankToNode(int rank) { return rank / nranksPerNode; } -void print_usage(const char* prog) -{ +void print_usage(const char* prog) { #ifdef MSCCLPP_USE_MPI_FOR_TESTS printf("usage: %s IP:PORT [rank nranks]\n", prog); #else @@ -208,8 +186,7 @@ void print_usage(const char* prog) } void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h, - int** data_d) -{ + int** data_d) { CUDACHECK(cudaMalloc(data_d, dataSize)); CUDACHECK(cudaMemset(*data_d, 0, dataSize)); @@ -226,8 +203,7 @@ void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSiz } void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& comm, - mscclpp::channel::DeviceChannelService& channelService, int* data_d, size_t dataSize) -{ + mscclpp::channel::DeviceChannelService& channelService, int* data_d, size_t dataSize) { int thisNode = rankToNode(rank); int cudaNum = rankToLocalRank(rank); std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum); @@ -237,8 +213,7 @@ void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& co std::vector> remoteMemories; for (int r = 0; r < world_size; ++r) { - if (r == rank) - continue; + if (r == rank) continue; mscclpp::Transport transport; if (rankToNode(r) == thisNode) { transport = mscclpp::Transport::CudaIpc; @@ -267,8 +242,7 @@ void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& co sizeof(mscclpp::channel::SimpleDeviceChannel) * devChannels.size())); } -void printUsage(const char* prog, bool isMpi) -{ +void printUsage(const char* prog, bool isMpi) { if (isMpi) { std::string st = "you are using MPI for this test\n"; st += "two possilbe usages are:\n"; @@ -284,8 +258,7 @@ void printUsage(const char* prog, bool isMpi) } } -std::unordered_map parseArgs(int argc, const char* argv[], bool isMpi) -{ +std::unordered_map parseArgs(int argc, const char* argv[], bool isMpi) { std::unordered_map options; for (int i = 1; i < argc; i++) { @@ -356,8 +329,7 @@ std::unordered_map parseArgs(int argc, const char* arg return options; } -int main(int argc, const char* argv[]) -{ +int main(int argc, const char* argv[]) { bool isMpi = false; #ifdef MSCCLPP_USE_MPI_FOR_TESTS isMpi = true; @@ -415,27 +387,22 @@ int main(int argc, const char* argv[]) size_t nelemsPerGPU = dataSize / sizeof(int) / world_size; try { - if (rank == 0) - printf("Initializing MSCCL++\n"); + if (rank == 0) printf("Initializing MSCCL++\n"); auto bootstrapper = std::make_shared(rank, world_size); bootstrapper->initialize(ip_port); mscclpp::Communicator comm(bootstrapper); mscclpp::channel::DeviceChannelService channelService(comm); - if (rank == 0) - printf("Initializing data for allgather test\n"); + if (rank == 0) printf("Initializing data for allgather test\n"); initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d); - if (rank == 0) - printf("Setting up the connection in MSCCL++\n"); + if (rank == 0) printf("Setting up the connection in MSCCL++\n"); setupMscclppConnections(rank, world_size, comm, channelService, data_d, dataSize); - if (rank == 0) - printf("Launching MSCCL++ proxy threads\n"); + if (rank == 0) printf("Launching MSCCL++ proxy threads\n"); channelService.startProxy(); - if (rank == 0) - printf("Testing the correctness of AllGather implementation\n"); + if (rank == 0) printf("Testing the correctness of AllGather implementation\n"); cudaStream_t stream; CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); CUDACHECK(cudaDeviceSynchronize()); @@ -453,13 +420,11 @@ int main(int argc, const char* argv[]) int tmp[16]; // A simple barrier bootstrapper->allGather(tmp, sizeof(int)); - if (rank == 0) - printf("Successfully checked the correctness\n"); + if (rank == 0) printf("Successfully checked the correctness\n"); // Perf test int iterwithoutcudagraph = 10; - if (rank == 0) - printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); + if (rank == 0) printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); CUDACHECK(cudaStreamSynchronize(stream)); bootstrapper->allGather(tmp, sizeof(int)); for (int i = 0; i < iterwithoutcudagraph; ++i) { @@ -470,8 +435,7 @@ int main(int argc, const char* argv[]) // cudaGraph Capture int cudagraphiter = 10; - if (rank == 0) - printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter); + if (rank == 0) printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter); cudaGraph_t graph; cudaGraphExec_t instance; cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); @@ -510,8 +474,7 @@ int main(int argc, const char* argv[]) (double)(dataSize) / 1e9 / (time_in_us / 1e6)); bootstrapper->allGather(tmp, sizeof(int)); - if (rank == 0) - printf("Stopping MSCCL++ proxy threads\n"); + if (rank == 0) printf("Stopping MSCCL++ proxy threads\n"); channelService.stopProxy(); } catch (std::exception& e) { diff --git a/test/allgather_test_host_offloading.cu b/test/allgather_test_host_offloading.cu index 63c72c2f..6a114d7c 100644 --- a/test/allgather_test_host_offloading.cu +++ b/test/allgather_test_host_offloading.cu @@ -1,18 +1,19 @@ +#include #include +#include #include #include -#include -#include -#include +#include #ifdef MSCCLPP_USE_MPI_FOR_TESTS #include "mpi.h" -#endif // MSCCLPP_USE_MPI_FOR_TESTS -#include +#endif // MSCCLPP_USE_MPI_FOR_TESTS #include #include -#include #include + +#include +#include #include int nranksPerNode; @@ -22,18 +23,17 @@ int world_size; // Propagate errors up // Check CUDA RT calls -#define CUCHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ +#define CUCHECK(cmd) \ + do { \ + cudaError_t err = cmd; \ + if (err != cudaSuccess) { \ + printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ + exit(EXIT_FAILURE); \ + } \ } while (false) // Measure current time in second. -static double getTime(void) -{ +static double getTime(void) { struct timespec tspec; if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { printf("clock_gettime failed\n"); @@ -42,38 +42,28 @@ static double getTime(void) return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; } - -__global__ void kernel(int r, int nranks, mscclpp::DeviceProxyFifo fifo, mscclpp::DeviceEpoch::DeviceHandle* handles, int handleIndex) -{ +__global__ void kernel(int r, int nranks, mscclpp::DeviceProxyFifo fifo, mscclpp::DeviceEpoch::DeviceHandle* handles, + int handleIndex) { int tid = threadIdx.x; - if (tid != r) - handles[tid].epochIncrement(); + if (tid != r) handles[tid].epochIncrement(); __syncthreads(); // uint64_t tail; - if (tid == 0){ + if (tid == 0) { mscclpp::ProxyTrigger trigger; trigger.fst = handleIndex; fifo.push(trigger); // tail = fifo.push(trigger); } - if (tid != r) - handles[tid].wait(); + if (tid != r) handles[tid].wait(); // if (tid == 0) // while(*(volatile uint64_t*)fifo.tailReplica < tail) {}; } -int rankToLocalRank(int rank) -{ - return rank % nranksPerNode; -} +int rankToLocalRank(int rank) { return rank % nranksPerNode; } -int rankToNode(int rank) -{ - return rank / nranksPerNode; -} +int rankToNode(int rank) { return rank / nranksPerNode; } -void print_usage(const char* prog) -{ +void print_usage(const char* prog) { #ifdef MSCCLPP_USE_MPI_FOR_TESTS printf("usage: %s IP:PORT [rank nranks]\n", prog); #else @@ -82,8 +72,7 @@ void print_usage(const char* prog) } void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h, - int** data_d) -{ + int** data_d) { CUCHECK(cudaMalloc(data_d, dataSize)); CUCHECK(cudaMemset(*data_d, 0, dataSize)); @@ -100,7 +89,7 @@ void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSiz } class MyProxyService { -private: + private: int deviceNumaNode_; mscclpp::Proxy proxy_; std::vector remoteMemories_; @@ -109,13 +98,17 @@ private: std::vector> deviceEpochs1_; std::vector> deviceEpochs2_; std::vector> connections_; - int dataSize_; -public: - MyProxyService(mscclpp::Communicator& comm, int* data_d, int dataSize) : remoteMemories_(world_size), connections_(world_size), dataSize_(dataSize), - proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) { + int dataSize_; + + public: + MyProxyService(mscclpp::Communicator& comm, int* data_d, int dataSize) + : remoteMemories_(world_size), + connections_(world_size), + dataSize_(dataSize), + proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) { int cudaDevice; CUCHECK(cudaGetDevice(&cudaDevice)); - getDeviceNumaNode(cudaDevice, &deviceNumaNode_); + deviceNumaNode_ = mscclpp::getDeviceNumaNode(cudaDevice); int thisNode = rankToNode(rank); int cudaNum = rankToLocalRank(rank); @@ -123,10 +116,9 @@ public: mscclpp::Transport ibTransport = mscclpp::getIBTransportByDeviceName(ibDevStr); std::vector> remoteMemoriesFuture(world_size); - localMemory_ = comm.registerMemory(data_d, dataSize, mscclpp::Transport::CudaIpc | ibTransport); for (int r = 0; r < world_size; ++r) { - if (r == rank){ + if (r == rank) { hostEpochs_.emplace_back(nullptr); deviceEpochs1_.emplace_back(nullptr); deviceEpochs2_.emplace_back(nullptr); @@ -148,23 +140,23 @@ public: deviceEpochs1_.emplace_back(std::make_shared(comm, connections_[r])); deviceEpochs2_.emplace_back(std::make_shared(comm, connections_[r])); comm.sendMemoryOnSetup(localMemory_, r, 0); - + remoteMemoriesFuture[r] = comm.recvMemoryOnSetup(r, 0); } comm.setup(); for (int r = 0; r < world_size; ++r) { - if (r == rank){ + if (r == rank) { continue; } remoteMemories_[r] = remoteMemoriesFuture[r].get(); - } + } } void bindThread() { if (deviceNumaNode_ >= 0) { - numaBind(deviceNumaNode_); + mscclpp::numaBind(deviceNumaNode_); } } @@ -174,45 +166,34 @@ public: int dataSizePerRank = dataSize_ / world_size; for (int r = 1; r < world_size; ++r) { int nghr = (rank + r) % world_size; - connections_[nghr]->write(remoteMemories_[nghr], rank*dataSizePerRank, localMemory_, rank*dataSizePerRank, dataSizePerRank); + connections_[nghr]->write(remoteMemories_[nghr], rank * dataSizePerRank, localMemory_, rank * dataSizePerRank, + dataSizePerRank); if (triggerRaw.fst == 1) deviceEpochs1_[nghr]->signal(); else deviceEpochs2_[nghr]->signal(); - if ((flusher % 64) == 0 && mscclpp::AllIBTransports.has(connections_[nghr]->transport())){ + if ((flusher % 64) == 0 && mscclpp::AllIBTransports.has(connections_[nghr]->transport())) { // if we are using IB transport, we need a flush every once in a while connections_[nghr]->flush(); } } flusher++; - } return mscclpp::ProxyHandlerResult::FlushFifoTailAndContinue; } - void start(){ - proxy_.start(); - } + void start() { proxy_.start(); } - void stop(){ - proxy_.stop(); - } + void stop() { proxy_.stop(); } - mscclpp::HostProxyFifo& fifo(){ - return proxy_.fifo(); - } + mscclpp::HostProxyFifo& fifo() { return proxy_.fifo(); } - mscclpp::DeviceEpoch::DeviceHandle getDeviceHandle1(int r){ - return deviceEpochs1_[r]->deviceHandle(); - } + mscclpp::DeviceEpoch::DeviceHandle getDeviceHandle1(int r) { return deviceEpochs1_[r]->deviceHandle(); } - mscclpp::DeviceEpoch::DeviceHandle getDeviceHandle2(int r){ - return deviceEpochs2_[r]->deviceHandle(); - } + mscclpp::DeviceEpoch::DeviceHandle getDeviceHandle2(int r) { return deviceEpochs2_[r]->deviceHandle(); } }; -std::unordered_map parseArgs(int argc, char* argv[]) -{ +std::unordered_map parseArgs(int argc, char* argv[]) { std::unordered_map options; for (int i = 1; i < argc; i++) { @@ -234,9 +215,7 @@ std::unordered_map parseArgs(int argc, char* argv[]) return options; } - -int main(int argc, char* argv[]) -{ +int main(int argc, char* argv[]) { // sleep(10); MPI_Init(&argc, &argv); auto parsedArgs = parseArgs(argc, argv); @@ -251,16 +230,13 @@ int main(int argc, char* argv[]) nranksPerNode = shmrank; MPI_Comm_free(&shmcomm); - int cudaNum = rankToLocalRank(rank); CUCHECK(cudaSetDevice(cudaNum)); - if (rank == 0) - printf("Initializing MSCCL++\n"); + if (rank == 0) printf("Initializing MSCCL++\n"); auto bootstrap = std::make_shared(rank, world_size); mscclpp::UniqueId uniqueId; - if (rank == 0) - uniqueId = bootstrap->createUniqueId(); + if (rank == 0) uniqueId = bootstrap->createUniqueId(); MPI_Bcast(&uniqueId, sizeof(uniqueId), MPI_BYTE, 0, MPI_COMM_WORLD); bootstrap->initialize(uniqueId); mscclpp::Communicator comm(bootstrap); @@ -273,22 +249,18 @@ int main(int argc, char* argv[]) } size_t nelemsPerGPU = dataSize / sizeof(int) / world_size; - if (rank == 0) - printf("Initializing data for allgather test\n"); + if (rank == 0) printf("Initializing data for allgather test\n"); initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d); - if (rank == 0) - printf("Setting up the connection in MSCCL++\n"); + if (rank == 0) printf("Setting up the connection in MSCCL++\n"); MyProxyService proxyService(comm, data_d, dataSize); // setupProxyService(comm, proxyService, data_d, dataSize); - if (rank == 0) - printf("Launching MSCCL++ proxy threads\n"); + if (rank == 0) printf("Launching MSCCL++ proxy threads\n"); proxyService.start(); mscclpp::DeviceProxyFifo fifo = proxyService.fifo().deviceFifo(); - if (rank == 0) - printf("Testing the correctness of AllGather implementation\n"); + if (rank == 0) printf("Testing the correctness of AllGather implementation\n"); cudaStream_t stream; CUCHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); mscclpp::DeviceEpoch::DeviceHandle* deviceHandles1; @@ -296,18 +268,18 @@ int main(int argc, char* argv[]) CUCHECK(cudaMalloc(&deviceHandles1, sizeof(mscclpp::DeviceEpoch::DeviceHandle) * world_size)); for (int i = 0; i < world_size; ++i) { - if (i == rank) - continue; + if (i == rank) continue; auto handle = proxyService.getDeviceHandle1(i); - CUCHECK(cudaMemcpy(&deviceHandles1[i], &handle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), cudaMemcpyHostToDevice)); + CUCHECK( + cudaMemcpy(&deviceHandles1[i], &handle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), cudaMemcpyHostToDevice)); } CUCHECK(cudaMalloc(&deviceHandles2, sizeof(mscclpp::DeviceEpoch::DeviceHandle) * world_size)); for (int i = 0; i < world_size; ++i) { - if (i == rank) - continue; + if (i == rank) continue; auto handle = proxyService.getDeviceHandle2(i); - CUCHECK(cudaMemcpy(&deviceHandles2[i], &handle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), cudaMemcpyHostToDevice)); + CUCHECK( + cudaMemcpy(&deviceHandles2[i], &handle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), cudaMemcpyHostToDevice)); } kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles1, 1); @@ -324,13 +296,11 @@ int main(int argc, char* argv[]) } bootstrap->barrier(); - if (rank == 0) - printf("Correctness test passed!\n"); + if (rank == 0) printf("Correctness test passed!\n"); double t0, t1, ms, time_in_us; int iterwithoutcudagraph = 10; - if (rank == 0) - printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); + if (rank == 0) printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); CUCHECK(cudaStreamSynchronize(stream)); bootstrap->barrier(); t0 = getTime(); @@ -344,12 +314,11 @@ int main(int argc, char* argv[]) ms = (t1 - t0) * 1000.0; time_in_us = ms * 1000. / (float)iterwithoutcudagraph / 2; printf("No Graph %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us, - (double)(dataSize) / 1e9 / (time_in_us / 1e6)); + (double)(dataSize) / 1e9 / (time_in_us / 1e6)); // cudaGraph Capture int cudagraphiter = 10; - if (rank == 0) - printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter); + if (rank == 0) printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter); cudaGraph_t graph; cudaGraphExec_t instance; cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); @@ -363,7 +332,7 @@ int main(int argc, char* argv[]) int cudagraphwarmup = 10; if (rank == 0) printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup, - cudagraphiter); + cudagraphiter); for (int i = 0; i < cudagraphwarmup; ++i) { cudaGraphLaunch(instance, stream); } @@ -373,7 +342,7 @@ int main(int argc, char* argv[]) int cudagraphlaunch = 10; if (rank == 0) printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch, - cudagraphiter); + cudagraphiter); bootstrap->barrier(); t0 = getTime(); for (int i = 0; i < cudagraphlaunch; ++i) { @@ -386,11 +355,10 @@ int main(int argc, char* argv[]) time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter / 2; if (rank == 0) printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us, - (double)(dataSize) / 1e9 / (time_in_us / 1e6)); + (double)(dataSize) / 1e9 / (time_in_us / 1e6)); bootstrap->barrier(); - if (rank == 0) - printf("Stopping MSCCL++ proxy threads\n"); + if (rank == 0) printf("Stopping MSCCL++ proxy threads\n"); proxyService.stop(); MSCCLPP_CUDATHROW(cudaFree(data_d)); diff --git a/test/allreduce_test.cu b/test/allreduce_test.cu index 7f2bf176..0a5fcc07 100644 --- a/test/allreduce_test.cu +++ b/test/allreduce_test.cu @@ -13,84 +13,57 @@ void* scratch = nullptr; void* sendRecvData = nullptr; cuda::barrier* barrier = nullptr; -struct Chunk -{ +struct Chunk { size_t offset; size_t size; }; -inline int getSendTag(int rank, int peer) -{ - return rank < peer ? 0 : 1; -} +inline int getSendTag(int rank, int peer) { return rank < peer ? 0 : 1; } -inline int getRecvTag(int rank, int peer) -{ - return rank < peer ? 1 : 0; -} +inline int getRecvTag(int rank, int peer) { return rank < peer ? 1 : 0; } -__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount) -{ +__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount) { size_t remainder = dataCount % numChunks; size_t smallChunkSize = dataCount / numChunks; size_t largeChunkSize = smallChunkSize + 1; size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0; size_t numSmallChunks = chunkCount - numLargeChunks; - size_t offset = - (remainder - numLargeChunks) * largeChunkSize + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize; + size_t offset = (remainder - numLargeChunks) * largeChunkSize + + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize; return Chunk{offset, numLargeChunks * largeChunkSize + numSmallChunks * smallChunkSize}; } -__host__ __device__ int peerIdx(int peerRank, int rank) -{ - return peerRank < rank ? peerRank : peerRank - 1; -} +__host__ __device__ int peerIdx(int peerRank, int rank) { return peerRank < rank ? peerRank : peerRank - 1; } -__host__ __device__ int peerRank(int peerIdx, int rank) -{ - return peerIdx < rank ? peerIdx : peerIdx + 1; -} +__host__ __device__ int peerRank(int peerIdx, int rank) { return peerIdx < rank ? peerIdx : peerIdx + 1; } -__host__ __device__ int phase1SendConnIdx(int peerRank, int rank) -{ - return peerIdx(peerRank, rank) * 3; -} +__host__ __device__ int phase1SendConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3; } -__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank) -{ - return peerIdx(peerRank, rank) * 3 + 1; -} +__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3 + 1; } -__host__ __device__ int phase2ConnIdx(int peerRank, int rank) -{ - return peerIdx(peerRank, rank) * 3 + 2; -} +__host__ __device__ int phase2ConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3 + 2; } -__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size) -{ +__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size) { if (threadIdx.x == 0) { conn.putWithSignalAndFlush(dstOffset, srcOffset, size); } __syncthreads(); } -__device__ void recv(mscclppDevConn_t& conn) -{ +__device__ void recv(mscclppDevConn_t& conn) { if (threadIdx.x == 0) { conn.wait(); } __syncthreads(); } -__device__ void reduceSum(int* dst, int* src, size_t size) -{ +__device__ void reduceSum(int* dst, int* src, size_t size) { for (int i = threadIdx.x; i < size; i += blockDim.x) { dst[i] += src[i]; } } -__global__ void initData(int* data, size_t size, int rank) -{ +__global__ void initData(int* data, size_t size, int rank) { for (int i = threadIdx.x; i < size; i += blockDim.x) { data[i] = rank; } @@ -98,8 +71,7 @@ __global__ void initData(int* data, size_t size, int rank) __global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t scratchDataCount, mscclppDevConn_t* conns, void* scratch, void* sendRecvData, - cuda::barrier* barrier) -{ + cuda::barrier* barrier) { int idx = blockIdx.x; int peer = peerRank(idx, rank); mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(peer, rank)]; @@ -116,8 +88,7 @@ __global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t send(phase1SendConn, toPeerChunk.offset * sizeof(int), dstOffset * sizeof(int), toPeerChunk.size * sizeof(int)); recv(phase1RecvConn); - if (threadIdx.x == 0) - barrier->arrive_and_wait(); + if (threadIdx.x == 0) barrier->arrive_and_wait(); __syncthreads(); // Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer @@ -135,8 +106,7 @@ __global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size); } - if (threadIdx.x == 0) - barrier->arrive_and_wait(); + if (threadIdx.x == 0) barrier->arrive_and_wait(); __syncthreads(); // 2nd communication phase: send the now reduced data between the user buffers @@ -147,8 +117,7 @@ __global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t } void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, - size_t* recvInplaceOffset, size_t count, int nranks) -{ + size_t* recvInplaceOffset, size_t count, int nranks) { size_t base = (count / ALIGN) * ALIGN; *sendcount = base; *recvcount = base; @@ -157,14 +126,12 @@ void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* par *paramcount = base; } -void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) -{ +void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) { size_t paramcount, sendInplaceOffset, recvInplaceOffset; AllReduceGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); } -testResult_t AllReduceInitData(struct testArgs* args, int in_place) -{ +testResult_t AllReduceInitData(struct testArgs* args, int in_place) { size_t recvcount = args->expectedBytes / sizeof(int); CUDACHECK(cudaSetDevice(args->gpuNum)); @@ -182,8 +149,7 @@ testResult_t AllReduceInitData(struct testArgs* args, int in_place) return testSuccess; } -void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) -{ +void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { double baseBw = (double)(count * typesize) / 1.0E9 / sec; *algBw = baseBw; @@ -192,8 +158,7 @@ void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, doubl } testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t nBytes, mscclppComm_t comm, - cudaStream_t stream, int kernelNum) -{ + cudaStream_t stream, int kernelNum) { int worldSize = comm->nRanks; int nPeers = worldSize - 1; int dataCount = nBytes / sizeof(int); @@ -207,8 +172,7 @@ testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, struct testColl allReduceTest = {"AllReduce", AllReduceGetCollByteCount, defaultInitColl, AllReduceInitData, AllReduceGetBw, AllReduceRunColl}; -testResult_t AllReduceSetupMscclppConnections(struct testArgs* args) -{ +testResult_t AllReduceSetupMscclppConnections(struct testArgs* args) { int rank = args->proc, worldSize = args->totalProcs; size_t bufferSize = args->maxbytes; Chunk chunk = getChunk(bufferSize / sizeof(int), args->totalProcs, rank, 1); @@ -224,7 +188,7 @@ testResult_t AllReduceSetupMscclppConnections(struct testArgs* args) MSCCLPPCHECK(mscclppConnect(args->comm, peer, sendTag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr)); MSCCLPPCHECK(mscclppConnect(args->comm, peer, recvTag, scratch, scratchBytes, mscclppTransportP2P, nullptr)); MSCCLPPCHECK( - mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr)); + mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr)); } } MSCCLPPCHECK(mscclppConnectionSetup(args->comm)); @@ -232,8 +196,7 @@ testResult_t AllReduceSetupMscclppConnections(struct testArgs* args) return testSuccess; } -testResult_t AllReduceTeardownMscclppConnections() -{ +testResult_t AllReduceTeardownMscclppConnections() { if (scratch != nullptr) { CUDACHECK(cudaFree(scratch)); scratch = nullptr; @@ -241,15 +204,14 @@ testResult_t AllReduceTeardownMscclppConnections() return testSuccess; } -testResult_t AllReduceRunTest(struct testArgs* args) -{ +testResult_t AllReduceRunTest(struct testArgs* args) { args->collTest = &allReduceTest; sendRecvData = args->recvbuff; CUDACHECK(cudaMalloc(&barrier, sizeof(cuda::barrier))); cuda::barrier initBarrier(args->totalProcs - 1); CUDACHECK( - cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier), cudaMemcpyHostToDevice)); + cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier), cudaMemcpyHostToDevice)); int nPeers = args->totalProcs - 1; int rank = args->proc; std::vector hostConns(nPeers * 3, mscclppDevConn_t()); diff --git a/test/bootstrap_test.cc b/test/bootstrap_test.cc deleted file mode 100644 index 0715d24f..00000000 --- a/test/bootstrap_test.cc +++ /dev/null @@ -1,116 +0,0 @@ -#include "mscclpp.h" -#ifdef MSCCLPP_USE_MPI_FOR_TESTS -#include -#endif -#include -#include -#include -#include - -#define MSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \ - return res; \ - } \ - } while (0); - -void print_usage(const char* prog) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - std::string st = "you are using MPI for this test\n"; - st += "tow possilbe usages are:\n"; - st += "> " + std::string(prog) + "\n"; - st += "or\n"; - st += "> " + std::string(prog) + " ip:port\n"; - printf("%s", st.c_str()); -#else - std::string st = "you are NOT using MPI for this test\n"; - st += "the only possible usage:\n"; - st += "> " + std::string(prog) + " ip:port rank world_size\n"; - printf("%s", st.c_str()); -#endif -} - -void myLogHandler(const char* msg) -{ - printf("myLogger: %s", msg); -} - -int main(int argc, const char* argv[]) -{ - if (argc >= 2 && (std::string(argv[1]) == "-h" || std::string(argv[1]) == "--help")) { - print_usage(argv[0]); - return 0; - } - int rank, world_size; -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc > 2) { - print_usage(argv[0]); - return -1; - } - MPI_Init(NULL, NULL); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - const char* ip_port; - if (argc == 2) - ip_port = argv[1]; - else - ip_port = NULL; -#else - if (argc != 4) { - print_usage(argv[0]); - return -1; - } - const char* ip_port = argv[1]; - rank = atoi(argv[2]); - world_size = atoi(argv[3]); -#endif - - MSCCLPPCHECK(mscclppSetLogHandler(myLogHandler)); - mscclppComm_t comm; - if (ip_port) { - MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank)); - } else { -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - mscclppUniqueId id; - if (rank == 0) - MSCCLPPCHECK(mscclppGetUniqueId(&id)); - MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); - MSCCLPPCHECK(mscclppCommInitRankFromId(&comm, world_size, id, rank)); -#else - fprintf(stderr, "this should have not been possible!\n"); - return -1; -#endif - } - - // allocate some test buffer - int* buf = (int*)calloc(world_size, sizeof(int)); - if (buf == nullptr) { - printf("calloc failed\n"); - return -1; - } - // each rank sets one element in the array - buf[rank] = rank; - - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, buf, sizeof(int))); - - // check the correctness of all elements in the output of AllGather - for (int i = 0; i < world_size; ++i) { - if (buf[i] != i) { - printf("wrong data: %d, expected %d\n", buf[i], i); - return -1; - } - } - - MSCCLPPCHECK(mscclppCommDestroy(comm)); - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Finalize(); -#endif - - printf("Rank %d Succeeded\n", rank); - return 0; -} diff --git a/test/bootstrap_test_cpp.cc b/test/bootstrap_test_cpp.cc index b32d83fa..12a4d9a1 100644 --- a/test/bootstrap_test_cpp.cc +++ b/test/bootstrap_test_cpp.cc @@ -1,34 +1,28 @@ -#include +#include #include #include #include -#include +#include -void test_allgather(std::shared_ptr bootstrap) -{ +void test_allgather(std::shared_ptr bootstrap) { std::vector tmp(bootstrap->getNranks(), 0); tmp[bootstrap->getRank()] = bootstrap->getRank() + 1; bootstrap->allGather(tmp.data(), sizeof(int)); for (int i = 0; i < bootstrap->getNranks(); i++) { assert(tmp[i] == i + 1); } - if (bootstrap->getRank() == 0) - std::cout << "AllGather test passed!" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "AllGather test passed!" << std::endl; } -void test_barrier(std::shared_ptr bootstrap) -{ +void test_barrier(std::shared_ptr bootstrap) { bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "Barrier test passed!" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "Barrier test passed!" << std::endl; } -void test_sendrecv(std::shared_ptr bootstrap) -{ +void test_sendrecv(std::shared_ptr bootstrap) { for (int i = 0; i < bootstrap->getNranks(); i++) { - if (bootstrap->getRank() == i) - continue; + if (bootstrap->getRank() == i) continue; int msg1 = (bootstrap->getRank() + 1) * 3; int msg2 = (bootstrap->getRank() + 1) * 3 + 1; int msg3 = (bootstrap->getRank() + 1) * 3 + 2; @@ -38,8 +32,7 @@ void test_sendrecv(std::shared_ptr bootstrap) } for (int i = 0; i < bootstrap->getNranks(); i++) { - if (bootstrap->getRank() == i) - continue; + if (bootstrap->getRank() == i) continue; int msg1 = 0; int msg2 = 0; int msg3 = 0; @@ -51,102 +44,79 @@ void test_sendrecv(std::shared_ptr bootstrap) assert(msg2 == (i + 1) * 3 + 1); assert(msg3 == (i + 1) * 3 + 2); } - if (bootstrap->getRank() == 0) - std::cout << "Send/Recv test passed!" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "Send/Recv test passed!" << std::endl; } -void test_all(std::shared_ptr bootstrap) -{ +void test_all(std::shared_ptr bootstrap) { test_allgather(bootstrap); test_barrier(bootstrap); test_sendrecv(bootstrap); } -void test_mscclpp_bootstrap_with_id(int rank, int worldSize) -{ +void test_mscclpp_bootstrap_with_id(int rank, int worldSize) { auto bootstrap = std::make_shared(rank, worldSize); mscclpp::UniqueId id; - if (bootstrap->getRank() == 0) - id = bootstrap->createUniqueId(); + if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); bootstrap->initialize(id); test_all(bootstrap); - if (bootstrap->getRank() == 0) - std::cout << "--- MSCCLPP::Bootstrap test with unique id passed! ---" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Bootstrap test with unique id passed! ---" << std::endl; } -void test_mscclpp_bootstrap_with_ip_port_pair(int rank, int worldSize, char* ipPortPiar) -{ +void test_mscclpp_bootstrap_with_ip_port_pair(int rank, int worldSize, char* ipPortPair) { std::shared_ptr bootstrap(new mscclpp::Bootstrap(rank, worldSize)); - bootstrap->initialize(ipPortPiar); + bootstrap->initialize(ipPortPair); test_all(bootstrap); - if (bootstrap->getRank() == 0) - std::cout << "--- MSCCLPP::Bootstrap test with ip_port pair passed! ---" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Bootstrap test with ip_port pair passed! ---" << std::endl; } -class MPIBootstrap : public mscclpp::BaseBootstrap -{ -public: - MPIBootstrap() : BaseBootstrap() - { - } - int getRank() override - { +class MPIBootstrap : public mscclpp::BaseBootstrap { + public: + MPIBootstrap() : BaseBootstrap() {} + int getRank() override { int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); return rank; } - int getNranks() override - { + int getNranks() override { int worldSize; MPI_Comm_size(MPI_COMM_WORLD, &worldSize); return worldSize; } - void allGather(void* sendbuf, int size) override - { + void allGather(void* sendbuf, int size) override { MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sendbuf, size, MPI_BYTE, MPI_COMM_WORLD); } - void barrier() override - { - MPI_Barrier(MPI_COMM_WORLD); - } - void send(void* sendbuf, int size, int dest, int tag) override - { + void barrier() override { MPI_Barrier(MPI_COMM_WORLD); } + void send(void* sendbuf, int size, int dest, int tag) override { MPI_Send(sendbuf, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD); } - void recv(void* recvbuf, int size, int source, int tag) override - { + void recv(void* recvbuf, int size, int source, int tag) override { MPI_Recv(recvbuf, size, MPI_BYTE, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } }; -void test_mpi_bootstrap() -{ +void test_mpi_bootstrap() { std::shared_ptr bootstrap(new MPIBootstrap()); test_all(bootstrap); - if (bootstrap->getRank() == 0) - std::cout << "--- MPI Bootstrap test passed! ---" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "--- MPI Bootstrap test passed! ---" << std::endl; } -int main(int argc, char** argv) -{ +int main(int argc, char** argv) { int rank, worldSize; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &worldSize); if (argc > 2) { - if (rank == 0) - std::cout << "Usage: " << argv[0] << " [ip:port]" << std::endl; + if (rank == 0) std::cout << "Usage: " << argv[0] << " [ip:port]" << std::endl; MPI_Finalize(); return 0; } test_mscclpp_bootstrap_with_id(rank, worldSize); - if (argc == 2) - test_mscclpp_bootstrap_with_ip_port_pair(rank, worldSize, argv[1]); + if (argc == 2) test_mscclpp_bootstrap_with_ip_port_pair(rank, worldSize, argv[1]); test_mpi_bootstrap(); MPI_Finalize(); return 0; -} \ No newline at end of file +} diff --git a/test/communicator_test_cpp.cu b/test/communicator_test_cpp.cu index f7ab270d..38b9fe62 100644 --- a/test/communicator_test_cpp.cu +++ b/test/communicator_test_cpp.cu @@ -1,23 +1,22 @@ -#include -#include +#include +#include #include -#include #include #include -#include +#include +#include #include -#define CUDATHROW(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - throw std::runtime_error(std::string("Cuda failure '") + cudaGetErrorString(err) + "'"); \ - } \ +#define CUDATHROW(cmd) \ + do { \ + cudaError_t err = cmd; \ + if (err != cudaSuccess) { \ + throw std::runtime_error(std::string("Cuda failure '") + cudaGetErrorString(err) + "'"); \ + } \ } while (false) -mscclpp::Transport findIb(int localRank) -{ +mscclpp::Transport findIb(int localRank) { mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2, mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5, mscclpp::Transport::IB6, mscclpp::Transport::IB7}; @@ -27,8 +26,7 @@ mscclpp::Transport findIb(int localRank) void register_all_memories(mscclpp::Communicator& communicator, int rank, int worldSize, void* devicePtr, size_t deviceBufferSize, mscclpp::Transport myIbDevice, mscclpp::RegisteredMemory& localMemory, - std::unordered_map& remoteMemory) -{ + std::unordered_map& remoteMemory) { localMemory = communicator.registerMemory(devicePtr, deviceBufferSize, mscclpp::Transport::CudaIpc | myIbDevice); std::unordered_map> futureRemoteMemory; for (int i = 0; i < worldSize; i++) { @@ -47,8 +45,7 @@ void register_all_memories(mscclpp::Communicator& communicator, int rank, int wo void make_connections(mscclpp::Communicator& communicator, int rank, int worldSize, int nRanksPerNode, mscclpp::Transport myIbDevice, - std::unordered_map>& connections) -{ + std::unordered_map>& connections) { for (int i = 0; i < worldSize; i++) { if (i != rank) { if (i / nRanksPerNode == rank / nRanksPerNode) { @@ -63,8 +60,7 @@ void make_connections(mscclpp::Communicator& communicator, int rank, int worldSi void write_remote(int rank, int worldSize, std::unordered_map>& connections, std::unordered_map& remoteRegisteredMemories, - mscclpp::RegisteredMemory& registeredMemory, int dataCountPerRank) -{ + mscclpp::RegisteredMemory& registeredMemory, int dataCountPerRank) { for (int i = 0; i < worldSize; i++) { if (i != rank) { auto& conn = connections.at(i); @@ -76,8 +72,7 @@ void write_remote(int rank, int worldSize, std::unordered_map& devicePtr) -{ +void device_buffer_init(int rank, int worldSize, int dataCount, std::vector& devicePtr) { for (int n = 0; n < (int)devicePtr.size(); n++) { std::vector hostBuffer(dataCount, 0); for (int i = 0; i < dataCount; i++) { @@ -89,8 +84,7 @@ void device_buffer_init(int rank, int worldSize, int dataCount, std::vector& devicePtr, bool skipLocal = false) -{ + std::vector& devicePtr, bool skipLocal = false) { for (int n = 0; n < (int)devicePtr.size(); n++) { std::vector hostBuffer(dataCount, 0); CUDATHROW(cudaMemcpy(hostBuffer.data(), devicePtr[n], dataCount * sizeof(int), cudaMemcpyDeviceToHost)); @@ -112,16 +106,13 @@ void test_write(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize std::shared_ptr bootstrap, std::unordered_map>& connections, std::vector>& remoteMemory, - std::vector& localMemory, std::vector& devicePtr, int numBuffers) -{ - + std::vector& localMemory, std::vector& devicePtr, int numBuffers) { assert((deviceBufferSize / sizeof(int)) % worldSize == 0); size_t dataCount = deviceBufferSize / sizeof(int); device_buffer_init(rank, worldSize, dataCount, devicePtr); bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "CUDA memory initialization passed" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl; for (int n = 0; n < numBuffers; n++) { write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize); @@ -145,20 +136,17 @@ void test_write(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize if (bootstrap->getRank() == 0) std::cout << "Polling for " << std::to_string(numBuffers) << " buffers passed" << std::endl; - if (bootstrap->getRank() == 0) - std::cout << "--- Testing vanialla writes passed ---" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "--- Testing vanialla writes passed ---" << std::endl; } -__global__ void increament_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) -{ +__global__ void increament_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) { int tid = threadIdx.x; if (tid != rank && tid < worldSize) { deviceEpochs[tid].epochIncrement(); } } -__global__ void wait_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) -{ +__global__ void wait_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) { int tid = threadIdx.x; if (tid != rank && tid < worldSize) { deviceEpochs[tid].wait(); @@ -171,9 +159,7 @@ void test_write_with_device_epochs(int rank, int worldSize, int nRanksPerNode, i std::unordered_map>& connections, std::vector>& remoteMemory, std::vector& localMemory, std::vector& devicePtr, - int numBuffers) -{ - + int numBuffers) { std::unordered_map> epochs; for (auto entry : connections) { auto& conn = entry.second; @@ -181,16 +167,14 @@ void test_write_with_device_epochs(int rank, int worldSize, int nRanksPerNode, i } communicator.setup(); bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "Epochs are created" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "Epochs are created" << std::endl; assert((deviceBufferSize / sizeof(int)) % worldSize == 0); size_t dataCount = deviceBufferSize / sizeof(int); device_buffer_init(rank, worldSize, dataCount, devicePtr); bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "CUDA memory initialization passed" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl; mscclpp::DeviceEpoch::DeviceHandle* deviceEpochHandles; CUDATHROW(cudaMalloc(&deviceEpochHandles, sizeof(mscclpp::DeviceEpoch::DeviceHandle) * worldSize)); @@ -204,8 +188,7 @@ void test_write_with_device_epochs(int rank, int worldSize, int nRanksPerNode, i CUDATHROW(cudaDeviceSynchronize()); bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "CUDA device epochs are created" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "CUDA device epochs are created" << std::endl; for (int n = 0; n < numBuffers; n++) { write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize); @@ -238,32 +221,26 @@ void test_write_with_host_epochs(int rank, int worldSize, int nRanksPerNode, int std::unordered_map>& connections, std::vector>& remoteMemory, std::vector& localMemory, std::vector& devicePtr, - int numBuffers) -{ - + int numBuffers) { std::unordered_map> epochs; for (auto entry : connections) { auto& conn = entry.second; - if (conn->transport() == mscclpp::Transport::CudaIpc) - continue; + if (conn->transport() == mscclpp::Transport::CudaIpc) continue; epochs.insert({entry.first, std::make_shared(communicator, conn)}); } communicator.setup(); bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "Epochs are created" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "Epochs are created" << std::endl; assert((deviceBufferSize / sizeof(int)) % worldSize == 0); size_t dataCount = deviceBufferSize / sizeof(int); device_buffer_init(rank, worldSize, dataCount, devicePtr); bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "CUDA memory initialization passed" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl; bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "Host epochs are created" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "Host epochs are created" << std::endl; for (int n = 0; n < numBuffers; n++) { write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize); @@ -291,25 +268,21 @@ void test_write_with_host_epochs(int rank, int worldSize, int nRanksPerNode, int << std::endl; } -void test_communicator(int rank, int worldSize, int nRanksPerNode) -{ +void test_communicator(int rank, int worldSize, int nRanksPerNode) { auto bootstrap = std::make_shared(rank, worldSize); mscclpp::UniqueId id; - if (bootstrap->getRank() == 0) - id = bootstrap->createUniqueId(); + if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); bootstrap->initialize(id); mscclpp::Communicator communicator(bootstrap); - if (bootstrap->getRank() == 0) - std::cout << "Communicator initialization passed" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "Communicator initialization passed" << std::endl; std::unordered_map> connections; auto myIbDevice = findIb(rank % nRanksPerNode); make_connections(communicator, rank, worldSize, nRanksPerNode, myIbDevice, connections); - if (bootstrap->getRank() == 0) - std::cout << "Connection setup passed" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "Connection setup passed" << std::endl; int numBuffers = 10; std::vector devicePtr(numBuffers); @@ -319,8 +292,7 @@ void test_communicator(int rank, int worldSize, int nRanksPerNode) std::vector> remoteMemory(numBuffers); for (int n = 0; n < numBuffers; n++) { - if (n % 100 == 0) - std::cout << "Registering memory for " << std::to_string(n) << " buffers" << std::endl; + if (n % 100 == 0) std::cout << "Registering memory for " << std::to_string(n) << " buffers" << std::endl; CUDATHROW(cudaMalloc(&devicePtr[n], deviceBufferSize)); register_all_memories(communicator, rank, worldSize, devicePtr[n], deviceBufferSize, myIbDevice, localMemory[n], remoteMemory[n]); @@ -338,16 +310,14 @@ void test_communicator(int rank, int worldSize, int nRanksPerNode) test_write_with_host_epochs(rank, worldSize, nRanksPerNode, deviceBufferSize, communicator, bootstrap, connections, remoteMemory, localMemory, devicePtr, numBuffers); - if (bootstrap->getRank() == 0) - std::cout << "--- MSCCLPP::Communicator tests passed! ---" << std::endl; + if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Communicator tests passed! ---" << std::endl; for (int n = 0; n < numBuffers; n++) { CUDATHROW(cudaFree(devicePtr[n])); } } -int main(int argc, char** argv) -{ +int main(int argc, char** argv) { int rank, worldSize; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); diff --git a/test/ib_test.cc b/test/ib_test.cc index 8f9f3dc7..da2d0f3a 100644 --- a/test/ib_test.cc +++ b/test/ib_test.cc @@ -1,14 +1,15 @@ -#include "checks.h" #include "ib.hpp" -#include "infiniband/verbs.h" + +#include #include #include -#include #include +#include "checks_internal.hpp" +#include "infiniband/verbs.h" + // Measure current time in second. -static double getTime(void) -{ +static double getTime(void) { struct timespec tspec; if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { printf("clock_gettime failed\n"); @@ -20,8 +21,7 @@ static double getTime(void) // Example usage: // Receiver: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 0 0 0 // Sender: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 1 0 0 -int main(int argc, const char* argv[]) -{ +int main(int argc, const char* argv[]) { if (argc != 5) { printf("Usage: %s <0(recv)/1(send)> \n", argv[0]); return 1; @@ -31,7 +31,7 @@ int main(int argc, const char* argv[]) int cudaDevId = atoi(argv[3]); std::string ibDevName = "mlx5_ib" + std::string(argv[4]); - CUDACHECK(cudaSetDevice(cudaDevId)); + MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevId)); int nelem = 1; auto data = mscclpp::allocUniqueCuda(nelem); @@ -53,8 +53,7 @@ int main(int argc, const char* argv[]) bootstrap->allGather(mrInfo.data(), sizeof(mscclpp::IbMrInfo)); for (int i = 0; i < bootstrap->getNranks(); ++i) { - if (i == isSend) - continue; + if (i == isSend) continue; qp->rtr(qpInfo[i]); qp->rts(); break; diff --git a/test/mscclpp-test/common.cu b/test/mscclpp-test/common.cu index 21b3ca25..77126631 100644 --- a/test/mscclpp-test/common.cu +++ b/test/mscclpp-test/common.cu @@ -68,21 +68,16 @@ double parseSize(const char* value) { return size * units; } -double allreduceTime(int worldSize, double value, int average) -{ +double allreduceTime(int worldSize, double value, int average) { double accumulator = value; if (average != 0) { - MPI_Op op = average == 1 ? MPI_SUM - : average == 2 ? MPI_MIN - : average == 3 ? MPI_MAX - : average == 4 ? MPI_SUM - : MPI_Op(); + MPI_Op op = + average == 1 ? MPI_SUM : average == 2 ? MPI_MIN : average == 3 ? MPI_MAX : average == 4 ? MPI_SUM : MPI_Op(); MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator, 1, MPI_DOUBLE, op, MPI_COMM_WORLD); } - if (average == 1) - accumulator /= worldSize; + if (average == 1) accumulator /= worldSize; return accumulator; } } // namespace @@ -94,8 +89,7 @@ BaseTestEngine::BaseTestEngine(bool inPlace) : error_(0), inPlace_(inPlace) { BaseTestEngine::~BaseTestEngine() { cudaStreamDestroy(stream_); } -void BaseTestColl::setupCollTest(const TestArgs& args, size_t size) -{ +void BaseTestColl::setupCollTest(const TestArgs& args, size_t size) { this->worldSize_ = args.totalRanks; this->typeSize_ = sizeof(int); this->setupCollTest(size); @@ -128,12 +122,9 @@ double BaseTestEngine::benchTime() { return deltaSec; } -void BaseTestEngine::barrier() { - this->comm_->bootstrapper()->barrier(); -} +void BaseTestEngine::barrier() { this->comm_->bootstrapper()->barrier(); } -void BaseTestEngine::runTest() -{ +void BaseTestEngine::runTest() { // warm-up for large size this->coll_->setupCollTest(args_, args_.maxBytes); this->barrier(); diff --git a/test/mscclpp-test/common.hpp b/test/mscclpp-test/common.hpp index 1c9ad85f..ddc100e0 100644 --- a/test/mscclpp-test/common.hpp +++ b/test/mscclpp-test/common.hpp @@ -67,7 +67,7 @@ class BaseTestEngine { virtual ~BaseTestEngine(); virtual void allocateBuffer() = 0; - int getTestErrors() {return error_;} + int getTestErrors() { return error_; } void setupTest(); void bootstrap(const TestArgs& args); void runTest(); diff --git a/test/p2p_test.cu b/test/p2p_test.cu deleted file mode 100644 index e2218e83..00000000 --- a/test/p2p_test.cu +++ /dev/null @@ -1,288 +0,0 @@ -#include "mscclpp.h" -#include -#include -#include -#include - -#include "common.h" - -#define RANKS_PER_NODE 8 -#define USE_DMA_FOR_P2P 1 -#define TEST_CONN_TYPE 0 // 0: P2P(for local)+IB(for remote), 1: IB-Only - -#define MSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \ - return res; \ - } \ - } while (0); - -// Check CUDA RT calls -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ - } while (false) - -// Measure current time in second. -static double getTime(void) -{ - struct timespec tspec; - if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { - printf("clock_gettime failed\n"); - exit(EXIT_FAILURE); - } - return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; -} - -__constant__ mscclppDevConn_t constDevConns[16]; - -__global__ void kernel(int rank, int world_size) -{ - if (threadIdx.x % 32 != 0) - return; - - int warpId = threadIdx.x / 32; - int remoteRank = (warpId < rank) ? warpId : warpId + 1; - mscclppDevConn_t devConn = constDevConns[remoteRank]; - volatile int* data = (volatile int*)devConn.localBuff; - volatile uint64_t* localFlag = devConn.localFlag; -#if (USE_DMA_FOR_P2P == 0) - volatile uint64_t* remoteSignalEpochId = devConn.remoteSignalEpochId; -#endif - volatile uint64_t* proxyFlag = devConn.proxyFlag; - - uint64_t baseFlag = *localFlag; - - if (threadIdx.x == 0) { - // Set my data and flag - *(data + rank) = rank + 1; - } - __syncthreads(); - - if (threadIdx.x == 0) { - // Do we need a sys fence? - // __threadfence_system(); - *localFlag = baseFlag + 1; - } - - // get a thread-local trigger and a request for waiting on it - // mscclppTrigger_t trig; - // mscclppRequest_t req = devConn.fifo.getTrigger(&trig); - - // Each warp receives data from different ranks -#if (USE_DMA_FOR_P2P == 1) - - // Trigger sending data, flag and synchronize after - auto req = devConn.fifo.putWithSignal(rank * sizeof(int), sizeof(int)); - - // Wait on the request to make sure it is safe to reuse buffer and flag - devConn.fifo.sync(req); - - // Wait for receiving data from remote rank - while (*proxyFlag == baseFlag) { - } - -#else // USE_DMA_FOR_P2P == 0 - - if (devConn.remoteBuff == NULL) { // IB - // Wait until the proxy have sent my data and flag - devConn.waitTrigger(trig); - - // Trigger sending data and flag - devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); - - // Wait for receiving data from remote rank - while (*proxyFlag == baseFlag) { - } - } else { // P2P - // Directly read data - volatile int* remoteData = (volatile int*)devConn.remoteBuff; - - // Wait until the remote data is set - while (*remoteSignalEpochId == baseFlag) { - } - - // Read remote data - data[remoteRank] = remoteData[remoteRank]; - } - -#endif -} - -int rankToLocalRank(int rank) -{ - return rank % RANKS_PER_NODE; -} - -int rankToNode(int rank) -{ - return rank / RANKS_PER_NODE; -} - -int cudaNumToIbNum(int cudaNum) -{ - int ibNum; - if (cudaNum == 0) { - ibNum = 0; - } else if (cudaNum == 1) { - ibNum = 4; - } else if (cudaNum == 2) { - ibNum = 1; - } else if (cudaNum == 3) { - ibNum = 5; - } else if (cudaNum == 4) { - ibNum = 2; - } else if (cudaNum == 5) { - ibNum = 6; - } else if (cudaNum == 6) { - ibNum = 3; - } else if (cudaNum == 7) { - ibNum = 7; - } else { - printf("Invalid cudaNum: %d\n", cudaNum); - exit(EXIT_FAILURE); - } - return ibNum; -} - -int main(int argc, const char* argv[]) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Init(NULL, NULL); -#endif - const char* ip_port; - int rank, world_size; - parse_arguments(argc, argv, &ip_port, &rank, &world_size); - int localRank = rankToLocalRank(rank); - int thisNode = rankToNode(rank); - int cudaNum = localRank; - int ibNum = cudaNumToIbNum(cudaNum); - - CUDACHECK(cudaSetDevice(cudaNum)); - std::string ibDevStr = "mlx5_ib" + std::to_string(ibNum); - - mscclppComm_t comm; - MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port)); - - int* data_d; - uint64_t* flag_d; - size_t data_size = sizeof(int) * world_size; - CUDACHECK(cudaMalloc(&data_d, data_size)); - CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t))); - CUDACHECK(cudaMemset(data_d, 0, data_size)); - CUDACHECK(cudaMemset(flag_d, 0, sizeof(uint64_t))); - - for (int r = 0; r < world_size; ++r) { - if (r == rank) - continue; - mscclppTransport_t transportType = mscclppTransportIB; - const char* ibDev = ibDevStr.c_str(); -#if (TEST_CONN_TYPE == 0) // P2P+IB - if (rankToNode(r) == thisNode) { - transportType = mscclppTransportP2P; - ibDev = NULL; - } -#endif - // Connect with all other ranks - MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, data_size, flag_d, transportType, ibDev)); - } - - MSCCLPPCHECK(mscclppConnectionSetup(comm)); - - MSCCLPPCHECK(mscclppProxyLaunch(comm)); - - mscclppDevConn_t* devConns; - int nCons; - MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons)); - - CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * world_size)); - - cudaStream_t stream; - CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); - CUDACHECK(cudaDeviceSynchronize()); - - // Read results from GPU - int* buf = (int*)calloc(world_size, sizeof(int)); - if (buf == nullptr) { - printf("calloc failed\n"); - return -1; - } - CUDACHECK(cudaMemcpy(buf, data_d, sizeof(int) * world_size, cudaMemcpyDeviceToHost)); - - bool failed = false; - for (int i = 0; i < world_size; ++i) { - if (buf[i] != i + 1) { - printf("rank: %d, wrong data: %d, expected %d\n", rank, buf[i], i + 1); - failed = true; - } - } - if (failed) { - return -1; - } - - // Perf test - cudaEvent_t ev_start; - cudaEvent_t ev_end; - CUDACHECK(cudaEventCreate(&ev_start)); - CUDACHECK(cudaEventCreate(&ev_end)); - - // warm up - // int warmupiter = 10; - // for (int i = 0; i < warmupiter; ++i) { - // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); - // } - - // cudaGraph Capture - cudaGraph_t graph; - cudaGraphExec_t instance; - cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); - int cudagraphiter = 100; - for (int i = 0; i < cudagraphiter; ++i) { - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); - } - cudaStreamEndCapture(stream, &graph); - cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); - - int cudagraphwarmup = 10; - for (int i = 0; i < cudagraphwarmup; ++i) { - cudaGraphLaunch(instance, stream); - } - CUDACHECK(cudaStreamSynchronize(stream)); - - // measure runtime - // CUDACHECK(cudaEventRecord(ev_start, stream)); - double t0 = getTime(); - int cudagraphlaunch = 10; - for (int i = 0; i < cudagraphlaunch; ++i) { - // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); - cudaGraphLaunch(instance, stream); - } - // CUDACHECK(cudaEventRecord(ev_end, stream)); - CUDACHECK(cudaStreamSynchronize(stream)); - - double t1 = getTime(); - float ms = (t1 - t0) * 1000.0; - // CUDACHECK(cudaEventElapsedTime(&ms, ev_start, ev_end)); - printf("rank: %d, time: %f us/iter\n", rank, ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter); - - MSCCLPPCHECK(mscclppProxyStop(comm)); - - MSCCLPPCHECK(mscclppCommDestroy(comm)); - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc == 2) { - MPI_Finalize(); - } -#endif - printf("Succeeded! %d\n", rank); - return 0; -} diff --git a/test/unit/core_tests.cc b/test/unit/core_tests.cc index e3bf7265..7adfb7ea 100644 --- a/test/unit/core_tests.cc +++ b/test/unit/core_tests.cc @@ -1,5 +1,6 @@ -#include #include +#include + #include class LocalCommunicatorTest : public ::testing::Test { diff --git a/test/unit/cuda_memory_tests.cc b/test/unit/cuda_memory_tests.cc index 50bea575..7e670aeb 100644 --- a/test/unit/cuda_memory_tests.cc +++ b/test/unit/cuda_memory_tests.cc @@ -1,4 +1,5 @@ #include + #include TEST(CudaMemoryTest, Shared) {