From 0902ce89c68eeb4148e732b41e3ef4398674c7fc Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Mon, 6 Feb 2023 05:32:24 +0000 Subject: [PATCH] compiles --- Makefile | 4 +- src/debug.cc | 238 +++++++-- src/include/comm.h | 2 +- src/include/core.h | 2 +- src/include/param.h | 29 + src/include/proxy.h | 20 +- src/init_test.cc | 6 +- src/param.cc | 81 +++ src/proxy.cc | 1245 +++++++++++++++++++++++++++++++++++++++++++ src/socket.cc | 820 ++++++++++++++++++++++++++++ src/utils.cc | 293 ++++++++++ 11 files changed, 2675 insertions(+), 65 deletions(-) create mode 100644 src/include/param.h create mode 100644 src/param.cc create mode 100644 src/proxy.cc create mode 100644 src/socket.cc create mode 100644 src/utils.cc diff --git a/Makefile b/Makefile index 8b0771e1..266187ed 100644 --- a/Makefile +++ b/Makefile @@ -77,7 +77,7 @@ endif BUILDDIR ?= $(abspath ./build) ABSBUILDDIR := $(abspath $(BUILDDIR)) -BUILDSRCS := init.cc debug.cc bootstrap.cc +BUILDSRCS := init.cc debug.cc bootstrap.cc utils.cc param.cc socket.cc proxy.cc BUILDOBJS := $(patsubst %.cc,$(ABSBUILDDIR)/src/%.o,$(BUILDSRCS)) TESTSSRCS := init_test.cc @@ -99,7 +99,7 @@ $(ABSBUILDDIR)/%.o: %.cc $(TESTBINS): %: %.o $(BUILDOBJS) @mkdir -p $(@D) - $(CXX) -o $@ $^ $(NVLDFLAGS) + $(NVCC) -o $@ $^ $(NVLDFLAGS) clean: rm -rf $(ABSBUILDDIR) diff --git a/src/debug.cc b/src/debug.cc index dd0047f8..3c0f663b 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -1,63 +1,205 @@ -#include -#include -#include -#include +/************************************************************************* + * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "core.h" +#include "mscclpp_net.h" +#include #include -#include "debug.h" +#include -using namespace std; +int mscclppDebugLevel = -1; +static int pid = -1; +static char hostname[1024]; +thread_local int mscclppDebugNoWarn = 0; +char mscclppLastError[1024] = ""; // Global string for the last error in human readable form +uint64_t mscclppDebugMask = MSCCLPP_INIT; // Default debug sub-system mask is INIT +FILE *mscclppDebugFile = stdout; +pthread_mutex_t mscclppDebugLock = PTHREAD_MUTEX_INITIALIZER; +std::chrono::steady_clock::time_point mscclppEpoch; -int mscclDebugLevel = -1; +static __thread int tid = -1; -void mscclppDebugInit() -{ - int lev = -1; - const char *mscclpp_debug = getenv("MSCCLPP_DEBUG"); - if (mscclpp_debug == nullptr) { - lev = MSCCLPP_LOG_NONE; - } else { - string mscclpp_debug_str(mscclpp_debug); - if (mscclpp_debug_str == "INFO") { - lev = MSCCLPP_LOG_INFO; - } else if (mscclpp_debug_str == "DEBUG") { - lev = MSCCLPP_LOG_DEBUG; - } else if (mscclpp_debug_str == "ABORT") { - lev = MSCCLPP_LOG_ABORT; - } else { - throw runtime_error("Unknown debug level given: " + mscclpp_debug_str); +void mscclppDebugInit() { + pthread_mutex_lock(&mscclppDebugLock); + if (mscclppDebugLevel != -1) { pthread_mutex_unlock(&mscclppDebugLock); return; } + const char* mscclpp_debug = getenv("MSCCLPP_DEBUG"); + int tempNcclDebugLevel = -1; + if (mscclpp_debug == NULL) { + tempNcclDebugLevel = MSCCLPP_LOG_NONE; + } else if (strcasecmp(mscclpp_debug, "VERSION") == 0) { + tempNcclDebugLevel = MSCCLPP_LOG_VERSION; + } else if (strcasecmp(mscclpp_debug, "WARN") == 0) { + tempNcclDebugLevel = MSCCLPP_LOG_WARN; + } else if (strcasecmp(mscclpp_debug, "INFO") == 0) { + tempNcclDebugLevel = MSCCLPP_LOG_INFO; + } else if (strcasecmp(mscclpp_debug, "ABORT") == 0) { + tempNcclDebugLevel = MSCCLPP_LOG_ABORT; + } else if (strcasecmp(mscclpp_debug, "TRACE") == 0) { + tempNcclDebugLevel = MSCCLPP_LOG_TRACE; + } + + /* Parse the MSCCLPP_DEBUG_SUBSYS env var + * This can be a comma separated list such as INIT,COLL + * or ^INIT,COLL etc + */ + char* mscclppDebugSubsysEnv = getenv("MSCCLPP_DEBUG_SUBSYS"); + if (mscclppDebugSubsysEnv != NULL) { + int invert = 0; + if (mscclppDebugSubsysEnv[0] == '^') { invert = 1; mscclppDebugSubsysEnv++; } + mscclppDebugMask = invert ? ~0ULL : 0ULL; + char *mscclppDebugSubsys = strdup(mscclppDebugSubsysEnv); + char *subsys = strtok(mscclppDebugSubsys, ","); + while (subsys != NULL) { + uint64_t mask = 0; + if (strcasecmp(subsys, "INIT") == 0) { + mask = MSCCLPP_INIT; + } else if (strcasecmp(subsys, "COLL") == 0) { + mask = MSCCLPP_COLL; + } else if (strcasecmp(subsys, "P2P") == 0) { + mask = MSCCLPP_P2P; + } else if (strcasecmp(subsys, "SHM") == 0) { + mask = MSCCLPP_SHM; + } else if (strcasecmp(subsys, "NET") == 0) { + mask = MSCCLPP_NET; + } else if (strcasecmp(subsys, "GRAPH") == 0) { + mask = MSCCLPP_GRAPH; + } else if (strcasecmp(subsys, "TUNING") == 0) { + mask = MSCCLPP_TUNING; + } else if (strcasecmp(subsys, "ENV") == 0) { + mask = MSCCLPP_ENV; + } else if (strcasecmp(subsys, "ALLOC") == 0) { + mask = MSCCLPP_ALLOC; + } else if (strcasecmp(subsys, "CALL") == 0) { + mask = MSCCLPP_CALL; + } else if (strcasecmp(subsys, "ALL") == 0) { + mask = MSCCLPP_ALL; + } + if (mask) { + if (invert) mscclppDebugMask &= ~mask; else mscclppDebugMask |= mask; + } + subsys = strtok(NULL, ","); + } + free(mscclppDebugSubsys); + } + + // Cache pid and hostname + getHostName(hostname, 1024, '.'); + pid = getpid(); + + /* Parse and expand the MSCCLPP_DEBUG_FILE path and + * then create the debug file. But don't bother unless the + * MSCCLPP_DEBUG level is > VERSION + */ + const char* mscclppDebugFileEnv = getenv("MSCCLPP_DEBUG_FILE"); + if (tempNcclDebugLevel > MSCCLPP_LOG_VERSION && mscclppDebugFileEnv != NULL) { + int c = 0; + char debugFn[PATH_MAX+1] = ""; + char *dfn = debugFn; + while (mscclppDebugFileEnv[c] != '\0' && c < PATH_MAX) { + if (mscclppDebugFileEnv[c++] != '%') { + *dfn++ = mscclppDebugFileEnv[c-1]; + continue; + } + switch (mscclppDebugFileEnv[c++]) { + case '%': // Double % + *dfn++ = '%'; + break; + case 'h': // %h = hostname + dfn += snprintf(dfn, PATH_MAX, "%s", hostname); + break; + case 'p': // %p = pid + dfn += snprintf(dfn, PATH_MAX, "%d", pid); + break; + default: // Echo everything we don't understand + *dfn++ = '%'; + *dfn++ = mscclppDebugFileEnv[c-1]; + break; + } + } + *dfn = '\0'; + if (debugFn[0] != '\0') { + FILE *file = fopen(debugFn, "w"); + if (file != nullptr) { + setbuf(file, nullptr); // disable buffering + mscclppDebugFile = file; + } } } - mscclDebugLevel = lev; + + mscclppEpoch = std::chrono::steady_clock::now(); + __atomic_store_n(&mscclppDebugLevel, tempNcclDebugLevel, __ATOMIC_RELEASE); + pthread_mutex_unlock(&mscclppDebugLock); } -void mscclppDebugLog(mscclDebugLogLevel level, const char *filefunc, int line, - const char *fmt, ...) -{ - if (mscclDebugLevel == -1) { - mscclppDebugInit(); - } - if (level < mscclDebugLevel) { - return; - } - string lev_str; +/* Common logging function used by the INFO, WARN and TRACE macros + * Also exported to the dynamically loadable Net transport modules so + * they can share the debugging mechanisms and output files + */ +void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char *filefunc, int line, const char *fmt, ...) { + if (__atomic_load_n(&mscclppDebugLevel, __ATOMIC_ACQUIRE) == -1) mscclppDebugInit(); + if (mscclppDebugNoWarn != 0 && level == MSCCLPP_LOG_WARN) { level = MSCCLPP_LOG_INFO; flags = mscclppDebugNoWarn; } + + // Save the last error (WARN) as a human readable string if (level == MSCCLPP_LOG_WARN) { - lev_str = "WARN"; - } else if (level == MSCCLPP_LOG_INFO) { - lev_str = "INFO"; - } else if (level == MSCCLPP_LOG_DEBUG) { - lev_str = "DEBUG"; - } else if (level == MSCCLPP_LOG_ABORT) { - lev_str = "ABORT"; - } else { - assert(false); + pthread_mutex_lock(&mscclppDebugLock); + va_list vargs; + va_start(vargs, fmt); + (void) vsnprintf(mscclppLastError, sizeof(mscclppLastError), fmt, vargs); + va_end(vargs); + pthread_mutex_unlock(&mscclppDebugLock); } + if (mscclppDebugLevel < level || ((flags & mscclppDebugMask) == 0)) return; + + if (tid == -1) { + tid = syscall(SYS_gettid); + } + + int cudaDev; + if (!(level == MSCCLPP_LOG_TRACE && flags == MSCCLPP_CALL)) { + cudaGetDevice(&cudaDev); + } + char buffer[1024]; + size_t len = 0; + if (level == MSCCLPP_LOG_WARN) { + len = snprintf(buffer, sizeof(buffer), "\n%s:%d:%d [%d] %s:%d MSCCLPP WARN ", + hostname, pid, tid, cudaDev, filefunc, line); + } else if (level == MSCCLPP_LOG_INFO) { + len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] MSCCLPP INFO ", hostname, pid, tid, cudaDev); + } else if (level == MSCCLPP_LOG_TRACE && flags == MSCCLPP_CALL) { + len = snprintf(buffer, sizeof(buffer), "%s:%d:%d MSCCLPP CALL ", hostname, pid, tid); + } else if (level == MSCCLPP_LOG_TRACE) { + auto delta = std::chrono::steady_clock::now() - mscclppEpoch; + double timestamp = std::chrono::duration_cast>(delta).count()*1000; + len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] %f %s:%d MSCCLPP TRACE ", + hostname, pid, tid, cudaDev, timestamp, filefunc, line); + } + + if (len) { + va_list vargs; + va_start(vargs, fmt); + len += vsnprintf(buffer+len, sizeof(buffer)-len, fmt, vargs); + va_end(vargs); + buffer[len++] = '\n'; + fwrite(buffer, 1, len, mscclppDebugFile); + } +} + +MSCCLPP_PARAM(SetThreadName, "SET_THREAD_NAME", 0); + +void mscclppSetThreadName(pthread_t thread, const char *fmt, ...) { + // pthread_setname_np is nonstandard GNU extension + // needs the following feature test macro +#ifdef _GNU_SOURCE + if (mscclppParamSetThreadName() != 1) return; + char threadName[MSCCLPP_THREAD_NAMELEN]; va_list vargs; va_start(vargs, fmt); - vsnprintf(buffer, 1024, fmt, vargs); + vsnprintf(threadName, MSCCLPP_THREAD_NAMELEN, fmt, vargs); va_end(vargs); - stringstream ss; - ss << "MSCCL " << lev_str << ": (" << filefunc << ":" << line << ") " - << buffer << endl; - cerr << ss.str(); + pthread_setname_np(thread, threadName); +#endif } diff --git a/src/include/comm.h b/src/include/comm.h index 5f850d70..a65b4697 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -251,7 +251,7 @@ struct mscclppComm { // char intraPad2[64 - sizeof(uint64_t)]; // uint64_t intraBarrierGate; // only used if this is intraComm0 -// struct mscclppProxyState proxyState; + struct mscclppProxyState proxyState; // // Whether this communicator uses collNet // int collNetSupport; diff --git a/src/include/core.h b/src/include/core.h index 54facae5..7a5900a9 100644 --- a/src/include/core.h +++ b/src/include/core.h @@ -17,6 +17,7 @@ #include "mscclpp.h" #include "debug.h" #include "alloc.h" +#include "param.h" /* #ifdef PROFAPI @@ -62,7 +63,6 @@ static __inline__ int mscclppTypeSize(mscclppDataType_t type) { #include "checks.h" #include "cudawrap.h" #include "utils.h" -#include "param.h" #include "nvtx.h" */ diff --git a/src/include/param.h b/src/include/param.h new file mode 100644 index 00000000..19e86df1 --- /dev/null +++ b/src/include/param.h @@ -0,0 +1,29 @@ +/************************************************************************* + * Copyright (c) 2017-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#ifndef MSCCLPP_PARAM_H_ +#define MSCCLPP_PARAM_H_ + +#include + +const char* userHomeDir(); +void setEnvFile(const char* fileName); +void initEnv(); + +void mscclppLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int64_t* cache); + +#define MSCCLPP_PARAM(name, env, deftVal) \ + int64_t mscclppParam##name() { \ + constexpr int64_t uninitialized = INT64_MIN; \ + static_assert(deftVal != uninitialized, "default value cannot be the uninitialized value."); \ + static int64_t cache = uninitialized; \ + if (__builtin_expect(__atomic_load_n(&cache, __ATOMIC_RELAXED) == uninitialized, false)) { \ + mscclppLoadParam("MSCCLPP_" env, deftVal, uninitialized, &cache); \ + } \ + return cache; \ + } + +#endif diff --git a/src/include/proxy.h b/src/include/proxy.h index 0d602adc..e47dcf0a 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -161,22 +161,22 @@ // int nextOps; // }; -// struct mscclppProxyState { -// // Service thread -// pthread_t thread; -// struct mscclppSocket* listenSock; -// int stop; +struct mscclppProxyState { + // Service thread + pthread_t thread; + struct mscclppSocket* listenSock; + int stop; // CUcontext cudaCtx; -// // Used by main thread -// union mscclppSocketAddress* peerAddresses; -// struct mscclppSocket* peerSocks; + // Used by main thread + union mscclppSocketAddress* peerAddresses; + struct mscclppSocket* peerSocks; // struct mscclppProxyOps* proxyOps; // void** sharedDevMems; -// // Progress thread + // Progress thread // struct mscclppProxyProgressState progressState; -// }; +}; // enum proxyConnectState { // connUninitialized = 0, diff --git a/src/init_test.cc b/src/init_test.cc index 8cbac7a4..de6e5e61 100644 --- a/src/init_test.cc +++ b/src/init_test.cc @@ -1,4 +1,3 @@ -#include #include "debug.h" #include "mscclpp.h" @@ -7,8 +6,9 @@ int main() mscclppUniqueId uid; mscclppResult_t res = mscclppGetUniqueId(&uid); if (res != mscclppSuccess) { - ABORT("mscclppGetUniqueId failed"); + printf("mscclppGetUniqueId failed"); + return -1; } - INFO("init_test succeed"); + printf("Succeeded!\n"); return 0; } diff --git a/src/param.cc b/src/param.cc new file mode 100644 index 00000000..1bd40c26 --- /dev/null +++ b/src/param.cc @@ -0,0 +1,81 @@ +/************************************************************************* + * Copyright (c) 2019-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "param.h" +#include "debug.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +const char* userHomeDir() { + struct passwd *pwUser = getpwuid(getuid()); + return pwUser == NULL ? NULL : pwUser->pw_dir; +} + +void setEnvFile(const char* fileName) { + FILE * file = fopen(fileName, "r"); + if (file == NULL) return; + + char *line = NULL; + char envVar[1024]; + char envValue[1024]; + size_t n = 0; + ssize_t read; + while ((read = getline(&line, &n, file)) != -1) { + if (line[read-1] == '\n') line[read-1] = '\0'; + int s=0; // Env Var Size + while (line[s] != '\0' && line[s] != '=') s++; + if (line[s] == '\0') continue; + strncpy(envVar, line, std::min(1023,s)); + envVar[s] = '\0'; + s++; + strncpy(envValue, line+s, 1023); + envValue[1023]='\0'; + setenv(envVar, envValue, 0); + //printf("%s : %s->%s\n", fileName, envVar, envValue); + } + if (line) free(line); + fclose(file); +} + +void initEnv() { + char confFilePath[1024]; + const char * userDir = userHomeDir(); + if (userDir) { + sprintf(confFilePath, "%s/.mscclpp.conf", userDir); + setEnvFile(confFilePath); + } + sprintf(confFilePath, "/etc/mscclpp.conf"); + setEnvFile(confFilePath); +} + +void mscclppLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int64_t* cache) { + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&mutex); + if (__atomic_load_n(cache, __ATOMIC_RELAXED) == uninitialized) { + char* str = getenv(env); + int64_t value = deftVal; + if (str && strlen(str) > 0) { + errno = 0; + value = strtoll(str, nullptr, 0); + if (errno) { + value = deftVal; + INFO(MSCCLPP_ALL,"Invalid value %s for %s, using default %lld.", str, env, (long long)deftVal); + } else { + INFO(MSCCLPP_ALL,"%s set by environment to %lld.", env, (long long)value); + } + } + __atomic_store_n(cache, value, __ATOMIC_RELAXED); + } + pthread_mutex_unlock(&mutex); +} diff --git a/src/proxy.cc b/src/proxy.cc new file mode 100644 index 00000000..189a1587 --- /dev/null +++ b/src/proxy.cc @@ -0,0 +1,1245 @@ +/************************************************************************* + * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "comm.h" +// #include "info.h" +// #include "collectives.h" +#include "socket.h" +// #include "shm.h" +// #include "profiler.h" +// #define ENABLE_TIMER 0 +// #include "timer.h" + +#include + +// enum { proxyRecv=0, proxySend=1 }; + +// static bool NeedProxy(int type, int pattern, int root, struct mscclppRing* ring, int nranks) { +// if (pattern == mscclppPatternRing || pattern == mscclppPatternRingTwice) return true; + +// /* In chains, one rank does not need a proxy. Let's figure out which one it is */ +// /* Which index in the reorganized rings should we compare root against */ +// const int myrank = 0, nextrank = 1, prevrank = nranks-1; +// int index = pattern == mscclppPatternPipelineFrom ? +// /* no recv / no send if root = */ +// /* bcast */ (type == proxyRecv ? myrank : nextrank ): +// /* reduce */ (type == proxyRecv ? prevrank : myrank ); +// int rank = ring->userRanks[index]; +// return (root != rank); +// } + +// #define PROXYARGS_ALLOCATE_SIZE MSCCLPP_MAX_OPS +// struct mscclppProxyPool { +// struct mscclppProxyPool *next; +// struct mscclppProxyArgs elems[PROXYARGS_ALLOCATE_SIZE]; +// }; + +// static mscclppResult_t allocateArgs(struct mscclppProxyProgressState* state, struct mscclppProxyArgs** argsptr) { +// struct mscclppProxyArgs* elem; +// if (state->pool == NULL) { +// // Allocate a new pool of elements. Make sure we allocate the memory close +// // to the network thread +// struct mscclppProxyPool* newPool; +// MSCCLPPCHECK(mscclppCalloc(&newPool, 1)); + +// struct mscclppProxyArgs* newElems = newPool->elems; +// // Chain newly allocated elements +// for (int i=0; ipool = newElems; +// // Save the pool memory block for later resource release +// newPool->next = state->pools; +// state->pools = newPool; +// } +// elem = state->pool; +// state->pool = state->pool->next; +// elem->next = elem->nextPeer = NULL; +// *argsptr = elem; +// return mscclppSuccess; +// } + +// //#define DEBUG_PROXY 1 +// #ifdef DEBUG_PROXY +// #define DEBUG_PROXY_PRINT printf +// #else +// #define DEBUG_PROXY_PRINT(...) +// #endif + +// #define OP_INDEX(op) ((op) ? (op)-state->pools->elems : -1) +// #define OP_SEEN 0x100000 + +// mscclppResult_t getOpIndex(struct mscclppProxyArgs* op, struct mscclppProxyProgressState* state, int* poolIndex, int* opIndex) { +// struct mscclppProxyPool* pool = state->pools; +// int p = 0; +// while (pool) { +// uint64_t o = op-pool->elems; +// if (o < PROXYARGS_ALLOCATE_SIZE) { +// *opIndex = o; +// *poolIndex = p; +// return mscclppSuccess; +// } +// pool = pool->next; +// p++; +// } +// WARN("Could not find pool of op %p\n", op); +// return mscclppInternalError; +// } + +// mscclppResult_t printProxyOp(struct mscclppProxyArgs* op, int poolIndex, int opIndex) { +// printf("[%d-%d|%ld| %s", poolIndex, opIndex, op->opCount, op->pattern == mscclppPatternSend ? "Send" : op->pattern == mscclppPatternRecv ? "Recv" : "Coll"); +// for (int s=0; snsubs; s++) { +// struct mscclppProxySubArgs* sub = op->subs+s; +// if (op->state == mscclppProxyOpProgress) { +// char status = ' '; +// if (op->pattern == mscclppPatternRecv) { +// if (sub->posted < sub->nsteps && sub->posted < sub->done + MSCCLPP_STEPS) status = 'I'; // Init +// else if (sub->received < sub->posted) status = 'R'; // Receiving +// else if (sub->received < sub->transmitted) status = 'R'; // Receiving +// else if (sub->transmitted < sub->received) status = 'F'; // Flushing +// else if (sub->done < sub->transmitted) status = 'G'; // Waiting on GPU +// else status = 'D'; // Done +// } else if (op->pattern == mscclppPatternSend) { +// if (sub->posted < sub->nsteps && sub->posted < sub->done + MSCCLPP_STEPS) status = 'I'; // Init +// else if (sub->transmitted < sub->posted) status = 'G'; // Waiting on GPU +// else if (sub->done < sub->transmitted) status = 'S'; // Sending +// else status = 'D'; // Done +// } +// printf(" %d%c/%d", sub->peer, status, sub->channelId); +// } else { +// printf(" %d/%d", sub->peer, sub->channelId); +// } +// } +// printf("]"); +// return mscclppSuccess; +// } +// mscclppResult_t dumpProxyState(struct mscclppProxyProgressState* state) { +// struct mscclppProxyArgs* op = state->active; +// int poolIndex, opIndex; +// printf("ACTIVE OPS\n"); +// while (op) { +// MSCCLPPCHECK(getOpIndex(op, state, &poolIndex, &opIndex)); +// if (op->state & OP_SEEN) { +// WARN("List loop at element %d-%d", poolIndex, opIndex); +// } +// MSCCLPPCHECK(printProxyOp(op, poolIndex, opIndex)); +// op->state |= OP_SEEN; +// printf("\n"); +// struct mscclppProxyArgs* nextOp = op->nextPeer; +// while (nextOp) { +// MSCCLPPCHECK(getOpIndex(nextOp, state, &poolIndex, &opIndex)); +// if (nextOp->state & OP_SEEN) { +// WARN("List loop at element %d-%d", poolIndex, opIndex); +// } +// printf("| `-> "); +// MSCCLPPCHECK(printProxyOp(nextOp, poolIndex, opIndex)); +// nextOp->state |= OP_SEEN; +// printf("\n"); +// if (nextOp->next) { +// WARN("Inactive op has next set!\n"); +// } +// nextOp = nextOp->nextPeer; +// } +// if (op->nextPeer == NULL) printf("|\n"); +// op = op->next; +// printf("v\n"); +// } +// printf("[X]\n"); + +// # if 0 +// printf("FREE OPS\n"); +// op = state->pool; +// while (op) { +// MSCCLPPCHECK(getOpIndex(op, state, &poolIndex, &opIndex)); +// if (op->state & OP_SEEN) { +// WARN("List loop at element %d-%d", poolIndex, opIndex); +// } +// MSCCLPPCHECK(printProxyOp(op, poolIndex, opIndex)); +// op->state |= OP_SEEN; +// printf("->"); +// op = op->next; +// } +// printf("[X]\n"); +// #else +// op = state->pool; +// while (op) { +// MSCCLPPCHECK(getOpIndex(op, state, &poolIndex, &opIndex)); +// if (op->state & OP_SEEN) { +// WARN("List loop at element %d-%d", poolIndex, opIndex); +// } +// op->state |= OP_SEEN; +// op = op->next; +// } +// #endif + +// struct mscclppProxyPool* pool = state->pools; +// poolIndex = 0; +// while (pool) { +// struct mscclppProxyArgs* elem = pool->elems; +// for (int e=0; estate & OP_SEEN) == 0) { +// printf("Elem %d-%d is not in any list:\n", poolIndex, e); +// MSCCLPPCHECK(printProxyOp(elem, poolIndex, e)); +// printf("\n"); +// } else { +// elem->state -= OP_SEEN; +// } +// } +// pool = pool->next; +// poolIndex++; +// } +// return mscclppSuccess; +// } + +// static mscclppResult_t mscclppProxyOpToArgs(struct mscclppProxyOp* op, struct mscclppProxyArgs* args, int subIndex) { +// struct mscclppProxySubArgs* sub = args->subs+subIndex; +// if (subIndex >= MSCCLPP_PROXY_MAX_SUBS) { +// WARN("Proxy append out of bounds"); +// return mscclppInternalError; +// } + +// //memset(sub, 0, sizeof(struct mscclppProxySubArgs)); +// sub->connection = op->connection; +// sub->channelId = op->channelId; +// sub->nsteps = op->nsteps; +// sub->nbytes = op->nbytes; +// sub->peer = op->root; +// args->nsubs = subIndex+1; +// if (subIndex) { +// if ((args->sliceSteps != op->sliceSteps) || +// (args->chunkSteps != op->chunkSteps) || +// (args->protocol != op->protocol) || +// (args->dtype != op->dtype) || +// (args->redOp != op->redOp)) { +// WARN("Proxy append mismatch"); +// return mscclppInternalError; +// } +// if (args->state != mscclppProxyOpReady) { +// WARN("Proxy append on running operation"); +// return mscclppInternalError; +// } +// return mscclppSuccess; +// } +// //memset(&args->progress, 0, sizeof(struct mscclppProxyArgs)-offsetof(struct mscclppProxyArgs, progress)); +// args->done = 0; +// args->opCount = op->opCount; +// args->sliceSteps = op->sliceSteps; +// args->chunkSteps = op->chunkSteps; +// args->chunkSize = op->chunkSize; +// args->dtype = op->dtype; +// args->redOp = op->redOp; +// args->pattern = op->pattern; +// args->protocol = op->protocol; +// args->state = mscclppProxyOpReady; +// args->progress = op->connection->tcomm->proxyProgress; +// args->proxyAppendPtr = op->connection->proxyAppendPtr; +// return mscclppSuccess; +// } + +// static mscclppResult_t ProxyAppend(struct mscclppProxyProgressState* state, struct mscclppProxyOp* op) { +// struct mscclppProxyConnection* connection = op->connection; +// int shared = connection->shared; +// struct mscclppProxyArgs* args = *connection->proxyAppendPtr; + +// if (args) { +// if (shared && args->opCount == op->opCount) { +// MSCCLPPCHECK(mscclppProxyOpToArgs(op, args, args->nsubs)); +// DEBUG_PROXY_PRINT("Insert (%d/%5ld/%5ld) as group with %5ld\n", shared, args->opCount, op->opCount, OP_INDEX(args)); +// } else { +// struct mscclppProxyArgs* prevArgs = args; +// MSCCLPPCHECK(allocateArgs(state, &args)); +// MSCCLPPCHECK(mscclppProxyOpToArgs(op, args, 0)); +// prevArgs->nextPeer = args; +// DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as nextPeer of %5ld\n", OP_INDEX(args), shared, prevArgs->opCount, args->opCount, OP_INDEX(prevArgs)); +// *(args->proxyAppendPtr) = args; +// } +// } else { +// // Nothing running for that peer. Add to the list +// MSCCLPPCHECK(allocateArgs(state, &args)); +// MSCCLPPCHECK(mscclppProxyOpToArgs(op, args, 0)); +// if (state->active == NULL) { +// // Create the list +// DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as first element\n", OP_INDEX(args), shared, args->opCount); +// state->active = args; +// } else { +// // Append element at the end of the list +// struct mscclppProxyArgs* last = state->active; +// while (last->next) last = last->next; +// last->next = args; +// DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as last element\n", OP_INDEX(args), shared, args->opCount); +// } +// *(args->proxyAppendPtr) = args; +// } +// return mscclppSuccess; +// } + +// mscclppResult_t mscclppProxyPost(struct mscclppProxyOpsPool* pool, int nextOps, int nextOpsEnd) { +// pthread_mutex_lock(&pool->mutex); +// if (pool->nextOps == -1) { +// pool->nextOps = nextOps; +// pthread_cond_signal(&pool->cond); +// } else { +// pool->ops[pool->nextOpsEnd].next = nextOps; +// } +// pool->nextOpsEnd = nextOpsEnd; +// pthread_mutex_unlock(&pool->mutex); +// return mscclppSuccess; +// } + +// mscclppResult_t mscclppLocalOpAppend(struct mscclppComm* comm, struct mscclppProxyConnector* proxyConn, struct mscclppProxyOp* proxyOp) { +// struct mscclppProxyOps* proxyOps = proxyConn->comm->proxyState.proxyOps; +// if (proxyOps == NULL) return mscclppInternalError; +// proxyOps += proxyConn->localRank; +// struct mscclppProxyOpsPool* pool = proxyOps->pool; + +// TIME_START(0); +// int opIndex = proxyOps->freeOp; +// struct mscclppProxyOp* op; +// if (opIndex != -1) { +// op = pool->ops+opIndex; +// proxyOps->freeOp = op->next; +// } else { +// int freeOp; +// while ((freeOp = pool->freeOps[comm->localRank]) == -1) sched_yield(); +// int freeOpNew; +// while ((freeOpNew = __sync_val_compare_and_swap(pool->freeOps+comm->localRank, freeOp, -1)) != freeOp) freeOp = freeOpNew; +// opIndex = freeOp; +// op = pool->ops+opIndex; +// proxyOps->freeOp = op->next; +// } +// if (op->next != -1) __builtin_prefetch(pool->ops+op->next); // Prefetch next free op +// memcpy(op, proxyOp, sizeof(struct mscclppProxyOp)); +// op->next = -1; +// op->connection = proxyConn->connection; +// if (proxyOps->nextOps == -1) { +// proxyOps->nextOps = proxyOps->nextOpsEnd = opIndex; +// } else { +// pool->ops[proxyOps->nextOpsEnd].next = opIndex; +// proxyOps->nextOpsEnd = opIndex; +// } +// if (++proxyOps->count == MAX_OPS_PER_PEER) { +// // Post what we have so far to free some ops in the pool +// // Do not post last operations as we could have more coming with the same opCount, and posting +// // them in different batches would break proxyArgs aggregation with subs. +// uint64_t lastOpCount = pool->ops[proxyOps->nextOpsEnd].opCount; +// int lastOp = -1; +// int toSend = 0; +// int ops = 0; +// for (int op= proxyOps->nextOps; op != proxyOps->nextOpsEnd; op=pool->ops[op].next) { +// ops++; +// if (pool->ops[op].opCount != lastOpCount) { +// lastOp = op; +// toSend = ops; +// } +// } +// if (lastOp == -1) { +// WARN("Unable to post incomplete proxy op chain %d..%d (opCount %ld)\n", proxyOps->nextOps, proxyOps->nextOpsEnd, lastOpCount); +// return mscclppInternalError; +// } +// // Cut chain at lastOp +// int nextOps = proxyOps->nextOps; +// proxyOps->nextOps = pool->ops[lastOp].next; +// pool->ops[lastOp].next = -1; +// MSCCLPPCHECK(mscclppProxyPost(proxyOps->pool, nextOps, lastOp)); +// proxyOps->count -= toSend; +// } +// TIME_STOP(0); +// return mscclppSuccess; +// } + +// static mscclppResult_t SaveProxy(struct mscclppChannel* channel, int type, int peer, struct mscclppProxyOp* op, int connIndex, bool* justInquire) { +// if (peer < 0) return mscclppSuccess; + +// struct mscclppChannelPeer* peerComm = channel->peers+peer; +// struct mscclppConnector* connector = type == proxyRecv ? peerComm->recv+connIndex : peerComm->send+connIndex; +// if (connector->transportComm == NULL) { +// WARN("Rank %d has no transport for %s peer %d on channel %d/%d", connector->comm->rank, +// type == proxyRecv ? "recv" : "send", peer, channel->id, connIndex); +// return mscclppInternalError; +// } +// if (connector->transportComm->proxyProgress == NULL) return mscclppSuccess; + +// if (justInquire) *justInquire = true; +// else { +// MSCCLPPCHECK(mscclppLocalOpAppend(connector->comm, &connector->proxyConn, op)); +// } +// return mscclppSuccess; +// } + +// // justInquire != nullptr means don't actually do anything, just assertain need of +// // mscclppProxySaveOp for this op. +// mscclppResult_t mscclppProxySaveOp(struct mscclppComm* comm, struct mscclppProxyOp* op, bool* justInquire) { +// struct mscclppChannel* channel = &comm->channels[op->channelId]; +// if (justInquire) *justInquire = false; +// switch (op->pattern) { +// case mscclppPatternRing: +// case mscclppPatternRingTwice: +// case mscclppPatternPipelineFrom: +// case mscclppPatternPipelineTo: { +// struct mscclppRing* ring = &channel->ring; +// if (NeedProxy(proxyRecv, op->pattern, op->root, ring, comm->nRanks)) { +// MSCCLPPCHECK(SaveProxy(channel, proxyRecv, ring->prev, op, 0, justInquire)); +// } +// if (NeedProxy(proxySend, op->pattern, op->root, ring, comm->nRanks)) { +// MSCCLPPCHECK(SaveProxy(channel, proxySend, ring->next, op, 0, justInquire)); +// } +// } break; +// case mscclppPatternTreeUp: +// case mscclppPatternTreeDown: +// case mscclppPatternTreeUpDown: { +// if (op->pattern != mscclppPatternTreeDown) { // Tree up +// struct mscclppTree* tree = &channel->tree; +// for (int i=0; idown[i], op, 0, justInquire)); +// } +// MSCCLPPCHECK(SaveProxy(channel, proxySend, tree->up, op, 0, justInquire)); +// } +// if (op->pattern != mscclppPatternTreeUp) { // Tree down +// struct mscclppTree* tree = &channel->tree; +// for (int i=0; i< MSCCLPP_MAX_TREE_ARITY; i++) { +// MSCCLPPCHECK(SaveProxy(channel, proxySend, tree->down[i], op, 0, justInquire)); +// } +// MSCCLPPCHECK(SaveProxy(channel, proxyRecv, tree->up, op, 0, justInquire)); +// } +// } break; +// case mscclppPatternCollnetChain: { +// MSCCLPPCHECK(SaveProxy(channel, proxySend, channel->collnetChain.up, op, 1, justInquire)); +// MSCCLPPCHECK(SaveProxy(channel, proxyRecv, channel->collnetChain.up, op, 0, justInquire)); +// } break; +// case mscclppPatternCollnetDirect: { +// MSCCLPPCHECK(SaveProxy(channel, proxySend, channel->collnetDirect.out, op, 1, justInquire)); +// MSCCLPPCHECK(SaveProxy(channel, proxyRecv, channel->collnetDirect.out, op, 0, justInquire)); +// } break; +// case mscclppPatternSend: +// case mscclppPatternRecv: { +// if (op->root == comm->rank) return mscclppSuccess; +// MSCCLPPCHECK(SaveProxy(channel, op->pattern == mscclppPatternSend ? proxySend : proxyRecv, op->root, op, 1, justInquire)); +// } break; +// } +// return mscclppSuccess; +// } + +// MSCCLPP_PARAM(ChunkSize, "CHUNK_SIZE", 0); + +// mscclppResult_t mscclppProxyComputeP2p(struct mscclppInfo* info, struct mscclppProxyOp* op) { +// memset(op, 0, sizeof(struct mscclppProxyOp)); +// int channelId = info->channelId; +// struct mscclppChannel* channel = info->comm->channels+channelId; +// op->channelId = channelId; +// op->sliceSteps = 1; +// op->chunkSteps = 1; +// op->dtype = info->datatype; +// op->protocol = info->protocol; + +// int stepSize = info->comm->buffSizes[op->protocol]/MSCCLPP_STEPS; + +// if (op->protocol == MSCCLPP_PROTO_SIMPLE) stepSize = info->comm->p2pChunkSize; +// info->chunkSize = stepSize; +// op->root = info->root; + +// struct mscclppChannelPeer* peer = channel->peers + op->root; +// if (info->coll == mscclppFuncSend) { +// op->pattern = mscclppPatternSend; +// if (op->root != info->comm->rank && peer->send[1].transportComm == &netTransport.send) { +// // Tune chunk size for the network +// if (info->count < stepSize) info->chunkSize /= 4; +// else if (info->count < 8*stepSize) info->chunkSize /= 2; +// } +// } else if (info->coll == mscclppFuncRecv) { +// op->pattern = mscclppPatternRecv; +// if (op->root != info->comm->rank && peer->recv[1].transportComm == &netTransport.recv) { +// // Tune chunk size for the network +// if (info->count < stepSize) info->chunkSize /= 4; +// else if (info->count < 8*stepSize) info->chunkSize /= 2; +// } +// } else { +// WARN("P2p operation is neither send or recv"); +// return mscclppInternalError; +// } +// if (mscclppParamChunkSize() != 0) { +// info->chunkSize = mscclppParamChunkSize(); +// } +// op->chunkSize = info->chunkSize; + +// // Compute nSteps for proxies +// int chunkEffectiveSize = op->chunkSize; +// if (op->protocol == MSCCLPP_PROTO_LL) { +// chunkEffectiveSize /= 2; +// } + +// op->nbytes = stepSize; +// op->nsteps = DIVUP(info->count, chunkEffectiveSize); +// if (op->nsteps == 0) op->nsteps = 1; + +// return mscclppSuccess; +// } + +// static mscclppResult_t removeOp(struct mscclppProxyProgressState* state, struct mscclppProxyArgs** opPtr, struct mscclppProxyArgs** prevOpPtr) { +// struct mscclppProxyArgs* freeOp = *opPtr; +// struct mscclppProxyArgs* next = freeOp->next; +// DEBUG_PROXY_PRINT("Remove %ld -> %ld -> %ld\n", OP_INDEX(*prevOpPtr), OP_INDEX(freeOp), OP_INDEX(next)); +// *opPtr = next; +// if (freeOp->nextPeer) { +// // replace op by nextPeer +// struct mscclppProxyArgs* nextPeer = freeOp->nextPeer; +// if (*prevOpPtr) { +// (*prevOpPtr)->next = nextPeer; +// } else { +// state->active = nextPeer; +// } +// nextPeer->next = next; +// *(prevOpPtr) = nextPeer; +// } else { +// *(freeOp->proxyAppendPtr) = NULL; +// if (*prevOpPtr) { +// (*prevOpPtr)->next = next; +// } else { +// state->active = next; +// } +// } +// freeOp->next = state->pool; +// state->pool = freeOp; +// DEBUG_PROXY_PRINT("Removed %5ld (%5ld) : ", OP_INDEX(freeOp), OP_INDEX(*freeOp->proxyAppendPtr)); +// #ifdef DEBUG_PROXY +// MSCCLPPCHECK(dumpProxyState(state)); +// #endif +// return mscclppSuccess; +// } + +// static mscclppResult_t progressOps(struct mscclppComm* comm, struct mscclppProxyProgressState* state, struct mscclppProxyArgs* opStart, int* idle) { +// struct mscclppProxyArgs* prevOp = NULL; +// struct mscclppProxyArgs* op = opStart; +// while (op) { +// if (op->state == mscclppProxyOpNone) return mscclppInternalError; +// TIME_START(0); TIME_START(1); +// MSCCLPPCHECK(op->progress(comm, op)); +// if (op->idle) { TIME_STOP(1); TIME_CANCEL(0); } else { TIME_CANCEL(1); TIME_STOP(0); } +// *idle &= op->idle; +// if (op->state == mscclppProxyOpNone) { +// TIME_START(2); +// MSCCLPPCHECK(removeOp(state, &op, &prevOp)); +// TIME_STOP(2); +// } else { +// prevOp = op; +// op = op->next; +// } +// } +// return mscclppSuccess; +// } + +// MSCCLPP_PARAM(ProxyAppendBatchSize, "PROXY_APPEND_BATCH_SIZE", 16); + +// static mscclppResult_t mscclppProxyGetPostedOps(struct mscclppComm* comm, int* added) { +// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; +// if (state->opsPool == NULL) return mscclppInternalError; +// struct mscclppProxyOpsPool* pool = state->opsPool; + +// struct mscclppProxyArgs profArgs; // Only used for profiling purposes +// if (state->nextOps != -1) goto process_nextops; + +// // If we have ops to progress, no need to block waiting for something to arrive or even wait for the lock +// // to be available. Exit, continue progress, and come back later. +// if (state->active != NULL && (pool->nextOps == -1 || pthread_mutex_trylock(&pool->mutex) != 0)) return mscclppSuccess; + +// if (state->active == NULL) { +// pthread_mutex_lock(&pool->mutex); +// while (pool->nextOps == -1 && !state->stop) { +// struct mscclppProxyArgs profArgs; // Only used for profiling purposes +// mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileSleep); +// pthread_cond_wait(&pool->cond, &pool->mutex); +// mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileWakeup); +// } +// if (state->stop) { // We might have been woken up to stop. +// pthread_mutex_unlock(&pool->mutex); +// return mscclppSuccess; +// } +// } + +// state->nextOps = pool->nextOps; +// pool->nextOps = pool->nextOpsEnd = -1; +// pthread_mutex_unlock(&pool->mutex); +// if (state->nextOps == -1) return mscclppInternalError; + +// process_nextops: +// mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileAppend); +// TIME_START(2); +// int freeOp[MSCCLPP_MAX_LOCAL_RANKS]; +// int freeOpEnd[MSCCLPP_MAX_LOCAL_RANKS]; +// for (int i=0; ilocalRanks; i++) freeOp[i] = -1; + +// uint64_t lastOpCount = 0; +// int lastPeer = -1; +// int count = 0; +// for (int opIndex = state->nextOps; opIndex != -1;) { +// struct mscclppProxyOp* peerOp = pool->ops+opIndex; +// int peer = opIndex / MAX_OPS_PER_PEER; +// if ((lastOpCount && peerOp->opCount != lastOpCount) || ((lastPeer != -1) && peer != lastPeer)) count++; +// if (count == mscclppParamProxyAppendBatchSize()+1) break; +// lastOpCount = peerOp->opCount; +// lastPeer = peer; +// if (peerOp->connection == NULL) return mscclppInternalError; +// if (peerOp->next != -1) __builtin_prefetch(pool->ops+peerOp->next); +// MSCCLPPCHECK(ProxyAppend(state, peerOp)); +// (*added)++; +// int lastOpIndex = opIndex; +// opIndex = peerOp->next; +// // Return op to peer pool +// if (freeOp[peer] == -1) { +// freeOpEnd[peer] = lastOpIndex; +// } else { +// peerOp->next = freeOp[peer]; +// } +// freeOp[peer] = lastOpIndex; +// state->nextOps = opIndex; +// } + +// for (int i=0; ilocalRanks; i++) { +// if (freeOp[i] == -1) continue; +// int newFree = freeOp[i]; +// int oldFree = pool->freeOps[i]; +// pool->ops[freeOpEnd[i]].next = oldFree; +// if (oldFree == -1) { +// // Nothing for the main thread to consume, we can set it. +// pool->freeOps[i] = newFree; +// } else { +// // The main thread may recycle free ops at any time, replace the freeOps value atomically and check it worked. +// int swap = __sync_val_compare_and_swap(pool->freeOps+i, oldFree, newFree); +// if (swap != oldFree) { +// if (swap != -1) return mscclppInternalError; +// // Ops were recycled while we were trying to swap, just set the value directly now. +// pool->ops[freeOpEnd[i]].next = -1; +// pool->freeOps[i] = newFree; +// } +// } +// } +// profArgs.opCount = *added; +// mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileAppendEnd); +// TIME_STOP(2); +// return mscclppSuccess; +// } + +// #include +// static mscclppProxyProgressState* mscclppLastProxyState; +// void mscclppDumpProxyState(int signal) { +// dumpProxyState(mscclppLastProxyState); +// } + +// MSCCLPP_PARAM(CreateThreadContext, "CREATE_THREAD_CONTEXT", 0); +// mscclppResult_t mscclppSetThreadContext(struct mscclppComm* comm) { +// #if CUDART_VERSION >= 11030 +// static int createThreadContext = -1; + +// if (createThreadContext == -1) { +// createThreadContext = mscclppParamCreateThreadContext(); +// if (createThreadContext) { +// if (CUPFN(cuCtxCreate) == nullptr || CUPFN(cuCtxDestroy) == nullptr || CUPFN(cuCtxSetCurrent) == nullptr) { +// WARN("Unable to create thread context due to old driver, disabling."); +// createThreadContext = 0; +// } +// } +// } +// if (createThreadContext) { +// if (comm->proxyState.cudaCtx == NULL) { +// if (CUPFN(cuCtxCreate(&comm->proxyState.cudaCtx, +// CU_CTX_SCHED_SPIN|CU_CTX_MAP_HOST, comm->cudaDev)) != CUDA_SUCCESS) { +// WARN("Failed to create CUDA context on device %d", comm->cudaDev); +// createThreadContext = 0; +// return mscclppSuccess; +// } +// } else { +// if (CUPFN(cuCtxSetCurrent(comm->proxyState.cudaCtx)) != CUDA_SUCCESS) { +// WARN("Failed to set CUDA context on device %d", comm->cudaDev); +// return mscclppUnhandledCudaError; +// } +// } +// } +// #endif +// return mscclppSuccess; +// } + +// // Set to SIGUSR1 or SIGUSR2 to help debug proxy state during hangs +// MSCCLPP_PARAM(ProxyDumpSignal, "PROXY_DUMP_SIGNAL", -1); + +// void* mscclppProxyProgress(void *comm_) { +// struct mscclppComm* comm = (struct mscclppComm*)comm_; +// if (mscclppSetThreadContext(comm) != mscclppSuccess) { +// WARN("[Proxy Progress] Failed to set CUDA context on device %d", comm->cudaDev); +// } else if (cudaSetDevice(comm->cudaDev) != cudaSuccess) { +// WARN("[Proxy Progress] Failed to set CUDA device %d", comm->cudaDev); +// } +// if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity); + +// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; +// state->nextOps = -1; +// const int sig = mscclppParamProxyDumpSignal(); +// if (sig != -1) signal(sig, mscclppDumpProxyState); +// mscclppLastProxyState = state; +// char threadName[MSCCLPP_THREAD_NAMELEN]; +// snprintf(threadName, MSCCLPP_THREAD_NAMELEN, "MSCCLPP Progress%2d", comm->cudaDev); +// nvtxNameOsThreadA(syscall(SYS_gettid), threadName); + +// int lastIdle = 0; +// struct mscclppProxyArgs profArgs; // Only used for profiling purposes +// while ((state->stop == false || (state->stop == true && state->active)) && *comm->abortFlag == 0) { +// int idle = 1; +// mscclppResult_t ret = progressOps(comm, state, state->active, &idle); +// if (ret != mscclppSuccess) { +// (void) mscclppCommSetAsyncError(comm, ret); +// INFO(MSCCLPP_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); +// return NULL; +// } +// if (lastIdle == 0 && idle == 1) mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileIdle); +// if (lastIdle == 1 && idle == 0) mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileActive); +// int added = 0; +// TIME_START(3); +// if (state->stop == false) +// ret = mscclppProxyGetPostedOps(comm, &added); +// if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); } +// if (ret != mscclppSuccess) { +// (void) mscclppCommSetAsyncError(comm, ret); +// INFO(MSCCLPP_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); +// } +// if (added == 0) { +// sched_yield(); // No request progressed. Let others run. +// } +// lastIdle = idle; +// } +// return NULL; +// } + +// mscclppResult_t mscclppProxyStart(struct mscclppComm* comm) { +// struct mscclppProxyOps* proxyOps = comm->proxyState.proxyOps; +// if (proxyOps == NULL) return mscclppSuccess; +// TIME_START(1); +// for (int r=0; rlocalRanks; r++) { +// struct mscclppProxyOps* ops = proxyOps+r; +// if (ops->pool == NULL || ops->nextOps == -1) continue; +// MSCCLPPCHECK(mscclppProxyPost(ops->pool, ops->nextOps, ops->nextOpsEnd)); +// ops->nextOps = ops->nextOpsEnd = -1; +// ops->count = 0; +// } +// comm->opCount++; +// TIME_STOP(1); +// return mscclppSuccess; +// } + +// mscclppResult_t mscclppProxyProgressCreate(struct mscclppComm* comm) { +// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; +// if (!state->thread) { +// pthread_create(&state->thread, NULL, mscclppProxyProgress, comm); +// mscclppSetThreadName(state->thread, "MSCCLPP Progress%2d", comm->cudaDev); +// } +// return mscclppSuccess; +// } + +// mscclppResult_t mscclppProxyProgressDestroy(struct mscclppComm* comm) { +// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; + +// // Request the proxy to stop and then wake it +// if (state->opsPool) { +// pthread_mutex_lock(&state->opsPool->mutex); +// state->stop = true; +// pthread_cond_signal(&state->opsPool->cond); +// pthread_mutex_unlock(&state->opsPool->mutex); +// pthread_join(state->thread, NULL); +// } + +// // Free off any memory allocated for the proxy arg pools +// while (state->pools != NULL) { +// struct mscclppProxyPool *next = state->pools->next; +// free(state->pools); +// state->pools = next; +// } + +// mscclppProfilingDump(); +// TIME_PRINT("Proxy"); +// return mscclppSuccess; +// } + +// struct mscclppProxyAsyncOp { +// int type; +// struct mscclppProxyConnection* connection; +// int reqSize, respSize; +// char *reqBuff, *respBuff; +// }; + +// struct mscclppProxyLocalPeer { +// struct mscclppSocket sock; +// int localRank; +// struct mscclppProxyAsyncOp asyncOps; +// }; + +// #define MSCCLPP_PROXY_CONN_POOL_SIZE_POW2 7 +// #define MSCCLPP_PROXY_CONN_POOL_SIZE (1<<(MSCCLPP_PROXY_CONN_POOL_SIZE_POW2)) +// #define MSCCLPP_PROXY_CONN_POOL_MASK ((MSCCLPP_PROXY_CONN_POOL_SIZE)-1) +// struct mscclppProxyConnectionPool { +// struct mscclppProxyConnection** pools; +// int banks; +// int offset; +// struct mscclppProxyAsyncOp* ops; +// }; + +// static mscclppResult_t mscclppProxyNewConnection(struct mscclppProxyConnectionPool* pool, int* id) { +// if (pool->offset == MSCCLPP_PROXY_CONN_POOL_SIZE) { +// MSCCLPPCHECK(mscclppRealloc(&pool->pools, pool->banks, pool->banks+1)); +// MSCCLPPCHECK(mscclppCalloc(pool->pools+pool->banks, MSCCLPP_PROXY_CONN_POOL_SIZE)); +// pool->banks++; +// pool->offset = 0; +// } +// *id = ((pool->banks-1) << MSCCLPP_PROXY_CONN_POOL_SIZE_POW2) + pool->offset; +// pool->offset++; +// return mscclppSuccess; +// } + +// static mscclppResult_t mscclppProxyGetConnection(struct mscclppProxyConnectionPool* pool, int id, struct mscclppProxyConnection** conn) { +// int bank = id>>MSCCLPP_PROXY_CONN_POOL_SIZE_POW2; +// int offset = id&MSCCLPP_PROXY_CONN_POOL_MASK; +// if ((pool->pools == NULL) || (bank > pool->banks) || (pool->pools[bank] == NULL)) return mscclppInternalError; +// *conn = pool->pools[bank]+offset; +// return mscclppSuccess; +// } + +// static mscclppResult_t proxyFree(struct mscclppProxyConnection* connection, struct mscclppComm* comm) { +// if (connection->send) { +// if (mscclppTransports[connection->transport]->send.proxyFree) { +// MSCCLPPCHECK(mscclppTransports[connection->transport]->send.proxyFree(connection, comm)); +// } +// } else { +// if (mscclppTransports[connection->transport]->recv.proxyFree) { +// MSCCLPPCHECK(mscclppTransports[connection->transport]->recv.proxyFree(connection, comm)); +// } +// } +// return mscclppSuccess; +// } + +// static mscclppResult_t mscclppProxyFreeConnections(struct mscclppProxyConnectionPool* pool, struct mscclppComm* comm) { +// for (int b=0; bbanks; b++) { +// int max = b == pool->banks-1 ? pool->offset : MSCCLPP_PROXY_CONN_POOL_SIZE; +// for (int i=0; ipools[b]+i; +// if (connection->state != connUninitialized) { +// MSCCLPPCHECK(proxyFree(connection, comm)); +// } +// } +// free(pool->pools[b]); +// } +// free(pool->pools); +// return mscclppSuccess; +// } + +// #include "transport.h" + +// mscclppResult_t mscclppProxyConnect(struct mscclppComm* comm, int transport, int send, int rank, struct mscclppProxyConnector* proxyConn) { +// struct mscclppSocket* sock; +// int ready; +// int type = mscclppProxyMsgInit; + +// // Keep one connection per mlocal rank +// proxyConn->connection = NULL; +// proxyConn->rank = rank; +// if (comm->proxyState.peerSocks == NULL) { +// MSCCLPPCHECK(mscclppCalloc(&comm->proxyState.peerSocks, comm->localRanks)); +// MSCCLPPCHECK(mscclppCalloc(&comm->proxyState.proxyOps, comm->localRanks)); +// MSCCLPPCHECK(mscclppCalloc(&comm->proxyState.sharedDevMems, comm->localRanks)); +// for (int i = 0; i < comm->localRanks; ++i) { +// MSCCLPPCHECK(mscclppSocketSetFd(-1, &comm->proxyState.peerSocks[i])); +// } +// } + +// MSCCLPPCHECK(mscclppTopoGetLocalRank(comm->topo, rank, &proxyConn->localRank)); +// sock = comm->proxyState.peerSocks + proxyConn->localRank; +// MSCCLPPCHECK(mscclppSocketReady(sock, &ready)); +// if (!ready) { +// MSCCLPPCHECK(mscclppSocketInit(sock, comm->proxyState.peerAddresses+rank, comm->magic, mscclppSocketTypeProxy, comm->abortFlag)); +// MSCCLPPCHECK(mscclppSocketConnect(sock)); +// } +// MSCCLPPCHECK(mscclppSocketSend(sock, &type, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketSend(sock, &transport, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketSend(sock, &send, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketSend(sock, &comm->localRank, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketRecv(sock, &proxyConn->connection, sizeof(void*))); +// struct mscclppTransportComm* tcomm = send ? &mscclppTransports[transport]->send : &mscclppTransports[transport]->recv; +// // If we need proxy progress, map progress ops +// if (tcomm->proxyProgress) { +// char poolPath[] = "/dev/shm/mscclpp-XXXXXX"; +// MSCCLPPCHECK(mscclppSocketRecv(sock, poolPath+sizeof("/dev/shm/mscclpp-")-1, sizeof("XXXXXX")-1)); +// struct mscclppProxyOps* proxyOps = comm->proxyState.proxyOps+proxyConn->localRank; +// if (proxyOps->pool == NULL) { +// MSCCLPPCHECK(mscclppShmOpen(poolPath, sizeof(struct mscclppProxyOpsPool), (void**)(&proxyOps->pool), NULL, -1, &proxyOps->handle)); +// proxyOps->nextOps = proxyOps->nextOpsEnd = proxyOps->freeOp = -1; +// } +// } +// INFO(MSCCLPP_NET, "Connection to proxy localRank %d -> connection %p", proxyConn->localRank, proxyConn->connection); +// proxyConn->comm = comm; +// return mscclppSuccess; +// } + +// const char* mscclppProxyMsgTypeStr[] = { "Unknown", "Init", "SharedInit", "Setup", "Connect", "Start", "Close", "Abort", "Stop" }; +// mscclppResult_t mscclppProxyCall(struct mscclppProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, void* respBuff, int respSize) { +// struct mscclppSocket* sock; +// mscclppResult_t ret = mscclppSuccess; + +// if (proxyConn->comm->proxyState.peerSocks == NULL) return mscclppInternalError; +// sock = proxyConn->comm->proxyState.peerSocks + proxyConn->localRank; +// if (sock == NULL) return mscclppInternalError; +// MSCCLPPCHECKGOTO(mscclppSocketSend(sock, &type, sizeof(int)), ret, error); +// MSCCLPPCHECKGOTO(mscclppSocketSend(sock, &proxyConn->connection, sizeof(void*)), ret, error); +// MSCCLPPCHECKGOTO(mscclppSocketSend(sock, &reqSize, sizeof(int)), ret, error); +// MSCCLPPCHECKGOTO(mscclppSocketSend(sock, &respSize, sizeof(int)), ret, error); +// if (reqSize) MSCCLPPCHECKGOTO(mscclppSocketSend(sock, reqBuff, reqSize), ret, error); +// if (respSize) MSCCLPPCHECKGOTO(mscclppSocketRecv(sock, respBuff, respSize), ret, error); +// return mscclppSuccess; +// error: +// WARN("Proxy Call to rank %d failed (%s)", proxyConn->comm->localRankToRank[proxyConn->localRank], mscclppProxyMsgTypeStr[type]); +// return ret; +// } + +// static mscclppResult_t proxyProgressInit(struct mscclppComm* comm) { +// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; +// if (state->opsPool == NULL) { +// int size = sizeof(struct mscclppProxyOpsPool); +// struct mscclppProxyOpsPool* pool = NULL; + +// // The service thread may be launched already but localRanks may not be set yet. +// while (comm->localRanks == 0) sched_yield(); + +// char shmPath[sizeof("/dev/shm/mscclpp-XXXXXX")]; +// shmPath[0] = '\0'; +// MSCCLPPCHECK(mscclppShmOpen(shmPath, size, (void**)&pool, NULL, comm->localRanks + 1, &state->handle)); +// // Init pool +// pool->nextOps = -1; + +// for (int r=0; rlocalRanks; r++) { +// pool->freeOps[r] = r*MAX_OPS_PER_PEER; +// for (int i=0; iops[r*MAX_OPS_PER_PEER+i].next = r*MAX_OPS_PER_PEER+i+1; +// pool->ops[(r+1)*MAX_OPS_PER_PEER-1].next = -1; +// } + +// // Setup mutex/cond to work inter-process +// pthread_mutexattr_t mutexAttr; +// pthread_mutexattr_init(&mutexAttr); +// pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED); +// pthread_mutex_init(&pool->mutex, &mutexAttr); +// pthread_condattr_t condAttr; +// pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED); +// pthread_cond_init(&pool->cond, &condAttr); +// state->opsPool = pool; + +// memcpy(state->opsPoolShmSuffix, shmPath+sizeof("/dev/shm/mscclpp-")-1, sizeof("XXXXXX")-1); + +// // All ops structures are created, we can start the progress thread +// MSCCLPPCHECK(mscclppProxyProgressCreate(comm)); +// } +// return mscclppSuccess; +// } + +// static void proxyOpsFree(struct mscclppComm* comm) { +// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; +// if (mscclppShmClose(state->handle) != mscclppSuccess) { +// WARN("[Service thread] shm close failed"); +// } +// } + +// mscclppResult_t mscclppProxyShmUnlink(struct mscclppComm* comm) { +// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; +// if (state->opsPool == NULL) return mscclppSuccess; + +// if (mscclppShmUnlink(state->handle) != mscclppSuccess) { +// WARN("[Service thread] proxy ops shm unlink failed"); +// } +// return mscclppSuccess; +// } + +// static mscclppResult_t proxyConnInit(struct mscclppProxyLocalPeer* peer, struct mscclppProxyConnectionPool* connectionPool, struct mscclppComm* comm) { +// struct mscclppSocket* sock = &peer->sock; +// int id; +// struct mscclppProxyConnection* connection; +// MSCCLPPCHECK(mscclppProxyNewConnection(connectionPool, &id)); +// MSCCLPPCHECK(mscclppProxyGetConnection(connectionPool, id, &connection)); +// connection->sock = sock; +// MSCCLPPCHECK(mscclppSocketRecv(sock, &connection->transport, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketRecv(sock, &connection->send, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketRecv(sock, &peer->localRank, sizeof(int))); +// connection->localRank = peer->localRank; +// MSCCLPPCHECK(mscclppSocketSend(sock, &connection, sizeof(void*))); +// connection->tcomm = connection->send ? &mscclppTransports[connection->transport]->send : &mscclppTransports[connection->transport]->recv; +// // If we need proxy progress, let's allocate ops and start the thread +// if (connection->tcomm->proxyProgress) { +// MSCCLPPCHECK(proxyProgressInit(comm)); +// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; +// MSCCLPPCHECK(mscclppSocketSend(sock, state->opsPoolShmSuffix, sizeof("XXXXXX")-1)); +// } +// INFO(MSCCLPP_NET, "New proxy %s connection %d from local rank %d, transport %d", connection->send ? "send":"recv", id, connection->localRank, connection->transport); +// __atomic_store_n(&connection->state, connInitialized, __ATOMIC_RELEASE); +// return mscclppSuccess; +// } + +// static mscclppResult_t proxyConnSharedInit(struct mscclppProxyLocalPeer* peer, struct mscclppProxyConnectionPool* connectionPool, struct mscclppComm* comm) { +// struct mscclppSocket* sock = &peer->sock; +// struct mscclppProxyConnection* connection; +// MSCCLPPCHECK(mscclppSocketRecv(sock, &connection, sizeof(void*))); +// int reqSize, respSize; +// MSCCLPPCHECK(mscclppSocketRecv(sock, &reqSize, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketRecv(sock, &respSize, sizeof(int))); +// if (reqSize != sizeof(int) || respSize != 0) return mscclppInternalError; +// int nChannels; +// MSCCLPPCHECK(mscclppSocketRecv(sock, &nChannels, sizeof(int))); +// if (connection->tcomm->proxySharedInit) MSCCLPPCHECK(connection->tcomm->proxySharedInit(connection, comm, nChannels)); +// __atomic_store_n(&connection->state, connSharedInitialized, __ATOMIC_RELEASE); +// return mscclppSuccess; +// } + +// static mscclppResult_t proxyProgressAsync(struct mscclppProxyAsyncOp* op, struct mscclppComm* comm, int* asyncOpCount) { +// int done = 1; +// if (op->type == mscclppProxyMsgSetup) { +// MSCCLPPCHECK(op->connection->tcomm->proxySetup(op->connection, comm, op->reqBuff, op->reqSize, op->respBuff, op->respSize, &done)); +// } else if (op->type == mscclppProxyMsgConnect) { +// MSCCLPPCHECK(op->connection->tcomm->proxyConnect(op->connection, comm, op->reqBuff, op->reqSize, op->respBuff, op->respSize, &done)); +// } else return mscclppInternalError; +// if (done) { +// if (op->type == mscclppProxyMsgSetup) +// __atomic_store_n(&op->connection->state, connSetupDone, __ATOMIC_RELEASE); +// else if (op->type == mscclppProxyMsgConnect) +// __atomic_store_n(&op->connection->state, connConnected, __ATOMIC_RELEASE); +// /* if setup or connect is done, we should not return any error at this point since +// * mscclppSocketSend might already send the respBuff to the requester. If we still choose +// * to abort and close the connection, it can cause segfault if the requester is using +// * the respBuff. */ +// if (op->respSize) mscclppSocketSend(op->connection->sock, op->respBuff, op->respSize); +// if (op->reqBuff) { +// free(op->reqBuff); +// op->reqBuff = NULL; +// } +// if (op->respBuff) { +// free(op->respBuff); +// op->respBuff = NULL; +// } +// op->type = 0; +// (*asyncOpCount)--; +// } else if (*comm->abortFlag != 0) { +// return mscclppInternalError; +// } + +// return mscclppSuccess; +// } + +// static mscclppResult_t proxyConnSetupConnect(int type, struct mscclppProxyLocalPeer* peer, struct mscclppProxyConnectionPool* connectionPool, struct mscclppComm* comm, int* asyncOpCount) { +// struct mscclppSocket* sock = &peer->sock; +// struct mscclppProxyAsyncOp* asyncOp = &peer->asyncOps; +// asyncOp->type = type; +// MSCCLPPCHECK(mscclppSocketRecv(sock, &asyncOp->connection, sizeof(void*))); + +// MSCCLPPCHECK(mscclppSocketRecv(sock, &asyncOp->reqSize, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketRecv(sock, &asyncOp->respSize, sizeof(int))); +// if (asyncOp->reqSize) { +// MSCCLPPCHECK(mscclppCalloc(&asyncOp->reqBuff, asyncOp->reqSize)); +// MSCCLPPCHECK(mscclppSocketRecv(sock, asyncOp->reqBuff, asyncOp->reqSize)); +// } +// if (asyncOp->respSize) MSCCLPPCHECK(mscclppCalloc(&asyncOp->respBuff, asyncOp->respSize)); +// (*asyncOpCount)++; +// MSCCLPPCHECK(proxyProgressAsync(asyncOp, comm, asyncOpCount)); +// return mscclppSuccess; +// } + +// #include + +// void* mscclppProxyService(void* _args) { +// struct mscclppComm* comm = (struct mscclppComm *) _args; +// if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity); +// if (mscclppSetThreadContext(comm) != mscclppSuccess) { +// WARN("[Proxy Service] Failed to set CUDA context on device %d", comm->cudaDev); +// } else if (cudaSetDevice(comm->cudaDev) != cudaSuccess) { +// WARN("[Proxy Service] Failed to set CUDA device %d", comm->cudaDev); +// } +// if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity); + +// // Prepare poll descriptor +// struct mscclppProxyConnectionPool connectionPool; +// connectionPool.pools = NULL; +// connectionPool.banks = 0; +// connectionPool.offset = MSCCLPP_PROXY_CONN_POOL_SIZE; + +// struct pollfd pollfds[MSCCLPP_MAX_LOCAL_RANKS+1]; +// struct mscclppProxyLocalPeer peers[MSCCLPP_MAX_LOCAL_RANKS]; +// memset(&peers, 0, sizeof(struct mscclppProxyLocalPeer)*MSCCLPP_MAX_LOCAL_RANKS); +// for (int s=0; sproxyState.listenSock, &pollfds[MSCCLPP_MAX_LOCAL_RANKS].fd) != mscclppSuccess) { +// WARN("[Proxy Service] Get listenSock fd fails\n"); +// return NULL; +// }; +// pollfds[MSCCLPP_MAX_LOCAL_RANKS].events = POLLIN; + +// int maxnpeers = 0; +// int npeers = 0; +// int stop = 0; +// int asyncOpCount = 0; +// while (stop == 0 || (stop == 1 && npeers > 0)) { +// /* Even if local comm aborts, we cannot let proxy thread exit if we still have peer +// * connections. Need to wait until all other related comms call abort and safely exit +// * together, or we could face segmentation fault. */ +// if (*comm->abortFlag != 0) stop = 1; +// /* never let proxy service thread blocks in poll, or it cannot receive abortFlag. */ +// int ret; +// do { +// ret = poll(pollfds, MSCCLPP_MAX_LOCAL_RANKS+1, asyncOpCount ? 0 : 500); +// } while (ret < 0 && errno == EINTR); +// if (ret < 0) { +// WARN("[Proxy Service] Poll failed: %s", strerror(errno)); +// return NULL; +// } +// if (pollfds[MSCCLPP_MAX_LOCAL_RANKS].revents) { +// int s = 0; +// while (s < MSCCLPP_MAX_LOCAL_RANKS && pollfds[s].fd >= 0) s++; +// if (s == MSCCLPP_MAX_LOCAL_RANKS) { +// WARN("[Proxy service] Too many connections (%d max)", MSCCLPP_MAX_LOCAL_RANKS); +// return NULL; +// } +// if (maxnpeers < s+1) maxnpeers = s+1; +// if (mscclppSocketInit(&peers[s].sock) != mscclppSuccess) { +// WARN("[Service thread] Initialize peers[%d].sock fails\n", s); +// return NULL; +// } +// if (mscclppSocketAccept(&peers[s].sock, comm->proxyState.listenSock) != mscclppSuccess) { +// WARN("[Service thread] Accept failed %s", strerror(errno)); +// } else { +// if (mscclppSocketGetFd(&peers[s].sock, &pollfds[s].fd) != mscclppSuccess) { +// WARN("[Service thread] Get peers[%d].sock fd fails\n", s); +// return NULL; +// } +// npeers++; +// peers[s].localRank = -1; +// } +// } +// for (int s=0; ssock; +// struct mscclppProxyAsyncOp* op = &peer->asyncOps; +// int closeConn = 0; +// int type = 0; +// mscclppResult_t res = mscclppSuccess; + +// if (pollfds[s].fd == -1) continue; +// if (op->type != 0) { +// res = proxyProgressAsync(op, comm, &asyncOpCount); +// type = op->type; +// if (res != mscclppSuccess) closeConn = 1; +// } else if (pollfds[s].revents & POLLIN) { +// int closed; +// if (mscclppSocketTryRecv(sock, &type, sizeof(int), &closed) != mscclppSuccess) { +// WARN("[Service thread] Could not receive type from localRank %d", peer->localRank); +// closeConn = 1; +// } else if (closed) { +// INFO(MSCCLPP_INIT|MSCCLPP_NET, "[Service thread] Connection closed by localRank %d", peer->localRank); +// closeConn = 1; +// } else { +// if (type == mscclppProxyMsgStop) { +// stop = 1; +// closeConn = 1; +// } else if (type == mscclppProxyMsgClose) { +// closeConn = 1; +// } else if (type == mscclppProxyMsgInit) { +// res = proxyConnInit(peers+s, &connectionPool, comm); +// } else if (type == mscclppProxyMsgSharedInit) { +// res = proxyConnSharedInit(peers+s, &connectionPool, comm); +// } else if (type == mscclppProxyMsgSetup || type == mscclppProxyMsgConnect) { +// res = proxyConnSetupConnect(type, peers+s, &connectionPool, comm, &asyncOpCount); +// } else { +// WARN("[Service thread] Unknown command %d from localRank %d\n", type, peer->localRank); +// closeConn = 1; +// } +// } +// } else if (pollfds[s].revents & POLLHUP) { +// closeConn = 1; +// } +// if (res != mscclppSuccess) { +// WARN("[Proxy Service %d] Failed to execute operation %s from rank %d, retcode %d", comm->rank, mscclppProxyMsgTypeStr[type], comm->localRankToRank[peer->localRank], res); +// closeConn = 1; +// } +// if (closeConn) { +// mscclppSocketClose(sock); +// if (op->reqBuff) { +// free(op->reqBuff); +// op->reqBuff = NULL; +// } +// if (op->respBuff) { +// free(op->respBuff); +// op->respBuff = NULL; +// } +// op->type = 0; +// pollfds[s].fd = -1; +// npeers--; +// } +// } +// } + +// // Wait for all operations to complete and stop progress thread before freeing any resource +// if (mscclppProxyProgressDestroy(comm) != mscclppSuccess) { +// WARN("[Proxy Service] proxyDestroy failed"); +// } +// for (int s=0; sproxyState.listenSock); +// proxyOpsFree(comm); +// return NULL; +// } + +mscclppResult_t mscclppProxyInit(struct mscclppComm* comm, struct mscclppSocket* sock, union mscclppSocketAddress* peerAddresses) { + comm->proxyState.listenSock = sock; + comm->proxyState.peerAddresses = peerAddresses; + return mscclppSuccess; +} + +// mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { +// // comm->proxyState.thread is pthread_join()'d by commFree() in init.cc +// pthread_create(&comm->proxyState.thread, NULL, mscclppProxyService, comm); +// mscclppSetThreadName(comm->proxyState.thread, "MSCCLPP Service %2d", comm->cudaDev); +// return mscclppSuccess; +// } + +// mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm) { +// struct mscclppProxyState* state = &comm->proxyState; + +// if (state == NULL) return mscclppSuccess; +// if (state->peerAddresses) { +// if (*comm->abortFlag == 0) { +// struct mscclppSocket sock; +// int type = mscclppProxyMsgStop; +// MSCCLPPCHECK(mscclppSocketInit(&sock, comm->proxyState.peerAddresses + comm->rank, comm->magic, mscclppSocketTypeProxy, comm->abortFlag)); +// MSCCLPPCHECK(mscclppSocketConnect(&sock)); +// MSCCLPPCHECK(mscclppSocketSend(&sock, &type, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketClose(&sock)); +// } +// free(state->peerAddresses); +// } + +// if (state->peerSocks) { +// for (int i=0; ilocalRanks; i++) { +// int fd; +// MSCCLPPCHECK(mscclppSocketGetFd(state->peerSocks + i, &fd)); +// if (fd >= 0) { +// if (state->proxyOps[i].pool) { +// MSCCLPPCHECK(mscclppShmClose(state->proxyOps[i].handle)); +// } +// if (state->sharedDevMems[i]) { +// CUDACHECK(cudaIpcCloseMemHandle(state->sharedDevMems[i])); +// } +// int type = mscclppProxyMsgClose; +// if (*comm->abortFlag == 0) MSCCLPPCHECK(mscclppSocketSend(state->peerSocks + i, &type, sizeof(int))); +// MSCCLPPCHECK(mscclppSocketClose(state->peerSocks + i)); +// } +// } +// free(state->peerSocks); +// free(state->proxyOps); +// free(state->sharedDevMems); +// } +// return mscclppSuccess; +// } diff --git a/src/socket.cc b/src/socket.cc new file mode 100644 index 00000000..57dc7cb8 --- /dev/null +++ b/src/socket.cc @@ -0,0 +1,820 @@ +/************************************************************************* + * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "socket.h" +#include "utils.h" +#include + +#include +#include +#include + +static mscclppResult_t socketProgressOpt(int op, struct mscclppSocket* sock, void* ptr, int size, int* offset, int block, int* closed) { + int bytes = 0; + *closed = 0; + char* data = (char*)ptr; + char line[SOCKET_NAME_MAXLEN+1]; + do { + if (op == MSCCLPP_SOCKET_RECV) bytes = recv(sock->fd, data+(*offset), size-(*offset), block ? 0 : MSG_DONTWAIT); + if (op == MSCCLPP_SOCKET_SEND) bytes = send(sock->fd, data+(*offset), size-(*offset), block ? MSG_NOSIGNAL : MSG_DONTWAIT | MSG_NOSIGNAL); + if (op == MSCCLPP_SOCKET_RECV && bytes == 0) { + *closed = 1; + return mscclppSuccess; + } + if (bytes == -1) { + if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + WARN("socketProgressOpt: Call to recv from %s failed : %s", mscclppSocketToString(&sock->addr, line), strerror(errno)); + return mscclppRemoteError; + } else { + bytes = 0; + } + } + (*offset) += bytes; + if (sock->abortFlag && *sock->abortFlag != 0) { + INFO(MSCCLPP_NET, "socketProgressOpt: abort called"); + return mscclppInternalError; + } + } while (bytes > 0 && (*offset) < size); + return mscclppSuccess; +} + +static mscclppResult_t socketProgress(int op, struct mscclppSocket* sock, void* ptr, int size, int* offset) { + int closed; + MSCCLPPCHECK(socketProgressOpt(op, sock, ptr, size, offset, 0, &closed)); + if (closed) { + char line[SOCKET_NAME_MAXLEN+1]; + WARN("socketProgress: Connection closed by remote peer %s", mscclppSocketToString(&sock->addr, line, 0)); + return mscclppRemoteError; + } + return mscclppSuccess; +} + +static mscclppResult_t socketWait(int op, struct mscclppSocket* sock, void* ptr, int size, int* offset) { + while (*offset < size) + MSCCLPPCHECK(socketProgress(op, sock, ptr, size, offset)); + return mscclppSuccess; +} + +/* Format a string representation of a (union mscclppSocketAddress *) socket address using getnameinfo() + * + * Output: "IPv4/IPv6 address" + */ +const char *mscclppSocketToString(union mscclppSocketAddress *addr, char *buf, const int numericHostForm /*= 1*/) { + if (buf == NULL || addr == NULL) return NULL; + struct sockaddr *saddr = &addr->sa; + if (saddr->sa_family != AF_INET && saddr->sa_family != AF_INET6) { buf[0]='\0'; return buf; } + char host[NI_MAXHOST], service[NI_MAXSERV]; + /* NI_NUMERICHOST: If set, then the numeric form of the hostname is returned. + * (When not set, this will still happen in case the node's name cannot be determined.) + */ + int flag = NI_NUMERICSERV | (numericHostForm ? NI_NUMERICHOST : 0); + (void) getnameinfo(saddr, sizeof(union mscclppSocketAddress), host, NI_MAXHOST, service, NI_MAXSERV, flag); + sprintf(buf, "%s<%s>", host, service); + return buf; +} + +static uint16_t socketToPort(union mscclppSocketAddress *addr) { + struct sockaddr *saddr = &addr->sa; + return ntohs(saddr->sa_family == AF_INET ? addr->sin.sin_port : addr->sin6.sin6_port); +} + +/* Allow the user to force the IPv4/IPv6 interface selection */ +static int envSocketFamily(void) { + int family = -1; // Family selection is not forced, will use first one found + char* env = getenv("MSCCLPP_SOCKET_FAMILY"); + if (env == NULL) + return family; + + INFO(MSCCLPP_ENV, "MSCCLPP_SOCKET_FAMILY set by environment to %s", env); + + if (strcmp(env, "AF_INET") == 0) + family = AF_INET; // IPv4 + else if (strcmp(env, "AF_INET6") == 0) + family = AF_INET6; // IPv6 + return family; +} + +static int findInterfaces(const char* prefixList, char* names, union mscclppSocketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) { +#ifdef ENABLE_TRACE + char line[SOCKET_NAME_MAXLEN+1]; +#endif + struct netIf userIfs[MAX_IFS]; + bool searchNot = prefixList && prefixList[0] == '^'; + if (searchNot) prefixList++; + bool searchExact = prefixList && prefixList[0] == '='; + if (searchExact) prefixList++; + int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS); + + int found = 0; + struct ifaddrs *interfaces, *interface; + getifaddrs(&interfaces); + for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) { + if (interface->ifa_addr == NULL) continue; + + /* We only support IPv4 & IPv6 */ + int family = interface->ifa_addr->sa_family; + if (family != AF_INET && family != AF_INET6) + continue; + + TRACE(MSCCLPP_INIT|MSCCLPP_NET,"Found interface %s:%s", interface->ifa_name, mscclppSocketToString((union mscclppSocketAddress *) interface->ifa_addr, line)); + + /* Allow the caller to force the socket family type */ + if (sock_family != -1 && family != sock_family) + continue; + + /* We also need to skip IPv6 loopback interfaces */ + if (family == AF_INET6) { + struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr); + if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue; + } + + // check against user specified interfaces + if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) { + continue; + } + + // Check that this interface has not already been saved + // getifaddrs() normal order appears to be; IPv4, IPv6 Global, IPv6 Link + bool duplicate = false; + for (int i = 0; i < found; i++) { + if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) { duplicate = true; break; } + } + + if (!duplicate) { + // Store the interface name + strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize); + // Store the IP address + int salen = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + memcpy(addrs+found, interface->ifa_addr, salen); + found++; + } + } + + freeifaddrs(interfaces); + return found; +} + +static bool matchSubnet(struct ifaddrs local_if, union mscclppSocketAddress* remote) { + /* Check family first */ + int family = local_if.ifa_addr->sa_family; + if (family != remote->sa.sa_family) { + return false; + } + + if (family == AF_INET) { + struct sockaddr_in* local_addr = (struct sockaddr_in*)(local_if.ifa_addr); + struct sockaddr_in* mask = (struct sockaddr_in*)(local_if.ifa_netmask); + struct sockaddr_in& remote_addr = remote->sin; + struct in_addr local_subnet, remote_subnet; + local_subnet.s_addr = local_addr->sin_addr.s_addr & mask->sin_addr.s_addr; + remote_subnet.s_addr = remote_addr.sin_addr.s_addr & mask->sin_addr.s_addr; + return (local_subnet.s_addr ^ remote_subnet.s_addr) ? false : true; + } else if (family == AF_INET6) { + struct sockaddr_in6* local_addr = (struct sockaddr_in6*)(local_if.ifa_addr); + struct sockaddr_in6* mask = (struct sockaddr_in6*)(local_if.ifa_netmask); + struct sockaddr_in6& remote_addr = remote->sin6; + struct in6_addr& local_in6 = local_addr->sin6_addr; + struct in6_addr& mask_in6 = mask->sin6_addr; + struct in6_addr& remote_in6 = remote_addr.sin6_addr; + bool same = true; + int len = 16; //IPv6 address is 16 unsigned char + for (int c = 0; c < len; c++) { //Network byte order is big-endian + char c1 = local_in6.s6_addr[c] & mask_in6.s6_addr[c]; + char c2 = remote_in6.s6_addr[c] & mask_in6.s6_addr[c]; + if (c1 ^ c2) { + same = false; + break; + } + } + // At last, we need to compare scope id + // Two Link-type addresses can have the same subnet address even though they are not in the same scope + // For Global type, this field is 0, so a comparison wouldn't matter + same &= (local_addr->sin6_scope_id == remote_addr.sin6_scope_id); + return same; + } else { + WARN("Net : Unsupported address family type"); + return false; + } +} + +int mscclppFindInterfaceMatchSubnet(char* ifNames, union mscclppSocketAddress* localAddrs, union mscclppSocketAddress* remoteAddr, int ifNameMaxSize, int maxIfs) { +#ifdef ENABLE_TRACE + char line[SOCKET_NAME_MAXLEN+1]; +#endif + char line_a[SOCKET_NAME_MAXLEN+1]; + int found = 0; + struct ifaddrs *interfaces, *interface; + getifaddrs(&interfaces); + for (interface = interfaces; interface && !found; interface = interface->ifa_next) { + if (interface->ifa_addr == NULL) continue; + + /* We only support IPv4 & IPv6 */ + int family = interface->ifa_addr->sa_family; + if (family != AF_INET && family != AF_INET6) + continue; + + // check against user specified interfaces + if (!matchSubnet(*interface, remoteAddr)) { + continue; + } + + // Store the local IP address + int salen = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + memcpy(localAddrs+found, interface->ifa_addr, salen); + + // Store the interface name + strncpy(ifNames+found*ifNameMaxSize, interface->ifa_name, ifNameMaxSize); + + TRACE(MSCCLPP_INIT|MSCCLPP_NET,"NET : Found interface %s:%s in the same subnet as remote address %s", interface->ifa_name, mscclppSocketToString(localAddrs+found, line), mscclppSocketToString(remoteAddr, line_a)); + found++; + if (found == maxIfs) break; + } + + if (found == 0) { + WARN("Net : No interface found in the same subnet as remote address %s", mscclppSocketToString(remoteAddr, line_a)); + } + freeifaddrs(interfaces); + return found; +} + +mscclppResult_t mscclppSocketGetAddrFromString(union mscclppSocketAddress* ua, const char* ip_port_pair) { + if (!(ip_port_pair && strlen(ip_port_pair) > 1)) { + WARN("Net : string is null"); + return mscclppInvalidArgument; + } + + bool ipv6 = ip_port_pair[0] == '['; + /* Construct the sockaddress structure */ + if (!ipv6) { + struct netIf ni; + // parse : string, expect one pair + if (parseStringList(ip_port_pair, &ni, 1) != 1) { + WARN("Net : No valid : pair found"); + return mscclppInvalidArgument; + } + + struct addrinfo hints, *p; + int rv; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + if ( (rv = getaddrinfo(ni.prefix, NULL, &hints, &p)) != 0) { + WARN("Net : error encountered when getting address info : %s", gai_strerror(rv)); + return mscclppInvalidArgument; + } + + // use the first + if (p->ai_family == AF_INET) { + struct sockaddr_in& sin = ua->sin; + memcpy(&sin, p->ai_addr, sizeof(struct sockaddr_in)); + sin.sin_family = AF_INET; // IPv4 + //inet_pton(AF_INET, ni.prefix, &(sin.sin_addr)); // IP address + sin.sin_port = htons(ni.port); // port + } else if (p->ai_family == AF_INET6) { + struct sockaddr_in6& sin6 = ua->sin6; + memcpy(&sin6, p->ai_addr, sizeof(struct sockaddr_in6)); + sin6.sin6_family = AF_INET6; // IPv6 + sin6.sin6_port = htons(ni.port); // port + sin6.sin6_flowinfo = 0; // needed by IPv6, but possibly obsolete + sin6.sin6_scope_id = 0; // should be global scope, set to 0 + } else { + WARN("Net : unsupported IP family"); + return mscclppInvalidArgument; + } + + freeaddrinfo(p); // all done with this structure + + } else { + int i, j = -1, len = strlen(ip_port_pair); + for (i = 1; i < len; i++) { + if (ip_port_pair[i] == '%') j = i; + if (ip_port_pair[i] == ']') break; + } + if (i == len) { + WARN("Net : No valid [IPv6]:port pair found"); + return mscclppInvalidArgument; + } + bool global_scope = (j == -1 ? true : false); // If no % found, global scope; otherwise, link scope + + char ip_str[NI_MAXHOST], port_str[NI_MAXSERV], if_name[IFNAMSIZ]; + memset(ip_str, '\0', sizeof(ip_str)); + memset(port_str, '\0', sizeof(port_str)); + memset(if_name, '\0', sizeof(if_name)); + strncpy(ip_str, ip_port_pair+1, global_scope ? i-1 : j-1); + strncpy(port_str, ip_port_pair+i+2, len-i-1); + int port = atoi(port_str); + if (!global_scope) strncpy(if_name, ip_port_pair+j+1, i-j-1); // If not global scope, we need the intf name + + struct sockaddr_in6& sin6 = ua->sin6; + sin6.sin6_family = AF_INET6; // IPv6 + inet_pton(AF_INET6, ip_str, &(sin6.sin6_addr)); // IP address + sin6.sin6_port = htons(port); // port + sin6.sin6_flowinfo = 0; // needed by IPv6, but possibly obsolete + sin6.sin6_scope_id = global_scope ? 0 : if_nametoindex(if_name); // 0 if global scope; intf index if link scope + } + return mscclppSuccess; +} + +int mscclppFindInterfaces(char* ifNames, union mscclppSocketAddress *ifAddrs, int ifNameMaxSize, int maxIfs) { + static int shownIfName = 0; + int nIfs = 0; + // Allow user to force the INET socket family selection + int sock_family = envSocketFamily(); + // User specified interface + char* env = getenv("MSCCLPP_SOCKET_IFNAME"); + if (env && strlen(env) > 1) { + INFO(MSCCLPP_ENV, "MSCCLPP_SOCKET_IFNAME set by environment to %s", env); + // Specified by user : find or fail + if (shownIfName++ == 0) INFO(MSCCLPP_NET, "MSCCLPP_SOCKET_IFNAME set to %s", env); + nIfs = findInterfaces(env, ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs); + } else { + // Try to automatically pick the right one + // Start with IB + nIfs = findInterfaces("ib", ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs); + // else see if we can get some hint from COMM ID + if (nIfs == 0) { + char* commId = getenv("MSCCLPP_COMM_ID"); + if (commId && strlen(commId) > 1) { + INFO(MSCCLPP_ENV, "MSCCLPP_COMM_ID set by environment to %s", commId); + // Try to find interface that is in the same subnet as the IP in comm id + union mscclppSocketAddress idAddr; + mscclppSocketGetAddrFromString(&idAddr, commId); + nIfs = mscclppFindInterfaceMatchSubnet(ifNames, ifAddrs, &idAddr, ifNameMaxSize, maxIfs); + } + } + // Then look for anything else (but not docker or lo) + if (nIfs == 0) nIfs = findInterfaces("^docker,lo", ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs); + // Finally look for docker, then lo. + if (nIfs == 0) nIfs = findInterfaces("docker", ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs); + if (nIfs == 0) nIfs = findInterfaces("lo", ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs); + } + return nIfs; +} + +mscclppResult_t mscclppSocketListen(struct mscclppSocket* sock) { + if (sock == NULL) { + WARN("mscclppSocketListen: pass NULL socket"); + return mscclppInvalidArgument; + } + if (sock->fd == -1) { + WARN("mscclppSocketListen: file descriptor is -1"); + return mscclppInvalidArgument; + } + + if (socketToPort(&sock->addr)) { + // Port is forced by env. Make sure we get the port. + int opt = 1; +#if defined(SO_REUSEPORT) + SYSCHECK(setsockopt(sock->fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt"); +#else + SYSCHECK(setsockopt(sock->fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt"); +#endif + } + + // addr port should be 0 (Any port) + SYSCHECK(bind(sock->fd, &sock->addr.sa, sock->salen), "bind"); + + /* Get the assigned Port */ + socklen_t size = sock->salen; + SYSCHECK(getsockname(sock->fd, &sock->addr.sa, &size), "getsockname"); + +#ifdef ENABLE_TRACE + char line[SOCKET_NAME_MAXLEN+1]; + TRACE(MSCCLPP_INIT|MSCCLPP_NET,"Listening on socket %s", mscclppSocketToString(&sock->addr, line)); +#endif + + /* Put the socket in listen mode + * NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn + */ + SYSCHECK(listen(sock->fd, 16384), "listen"); + sock->state = mscclppSocketStateReady; + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketGetAddr(struct mscclppSocket* sock, union mscclppSocketAddress* addr) { + if (sock == NULL) { + WARN("mscclppSocketGetAddr: pass NULL socket"); + return mscclppInvalidArgument; + } + if (sock->state != mscclppSocketStateReady) return mscclppInternalError; + memcpy(addr, &sock->addr, sizeof(union mscclppSocketAddress)); + return mscclppSuccess; +} + +static mscclppResult_t socketTryAccept(struct mscclppSocket* sock) { + socklen_t socklen = sizeof(union mscclppSocketAddress); + sock->fd = accept(sock->acceptFd, &sock->addr.sa, &socklen); + if (sock->fd != -1) { + sock->state = mscclppSocketStateAccepted; + } else if (errno != EAGAIN && errno != EWOULDBLOCK) { + WARN("socketTryAccept: get errno %d that is not EAGAIN or EWOULDBLOCK", errno); + return mscclppSystemError; + } + return mscclppSuccess; +} + +static mscclppResult_t socketFinalizeAccept(struct mscclppSocket* sock) { + uint64_t magic; + enum mscclppSocketType type; + int received = 0; + MSCCLPPCHECK(mscclppSocketProgress(MSCCLPP_SOCKET_RECV, sock, &magic, sizeof(magic), &received)); + if (received == 0) return mscclppSuccess; + MSCCLPPCHECK(socketWait(MSCCLPP_SOCKET_RECV, sock, &magic, sizeof(magic), &received)); + if (magic != sock->magic) { + WARN("socketFinalizeAccept: wrong magic %lx != %lx", magic, sock->magic); + close(sock->fd); + sock->fd = -1; + // Ignore spurious connection and accept again + sock->state = mscclppSocketStateAccepting; + return mscclppSuccess; + } else { + received = 0; + MSCCLPPCHECK(socketWait(MSCCLPP_SOCKET_RECV, sock, &type, sizeof(type), &received)); + if (type != sock->type) { + WARN("socketFinalizeAccept: wrong type %d != %d", type, sock->type); + sock->state = mscclppSocketStateError; + close(sock->fd); + sock->fd = -1; + return mscclppInternalError; + } else { + sock->state = mscclppSocketStateReady; + } + } + return mscclppSuccess; +} + +static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) { + /* blocking/non-blocking connect() is determined by asyncFlag. */ + int ret = connect(sock->fd, &sock->addr.sa, sock->salen); + + if (ret == 0) { + sock->state = mscclppSocketStateConnected; + return mscclppSuccess; + } else if (errno == EINPROGRESS) { + sock->state = mscclppSocketStateConnectPolling; + return mscclppSuccess; + } else if (errno == ECONNREFUSED) { + if (++sock->refusedRetries == RETRY_REFUSED_TIMES) { + sock->state = mscclppSocketStateError; + WARN("socketStartConnect: exceeded retries (%d)", sock->refusedRetries); + return mscclppRemoteError; + } + usleep(SLEEP_INT); + if (sock->refusedRetries % 1000 == 0) INFO(MSCCLPP_ALL, "Call to connect returned %s, retrying", strerror(errno)); + return mscclppSuccess; + } else if (errno == ETIMEDOUT) { + if (++sock->timedOutRetries == RETRY_TIMEDOUT_TIMES) { + sock->state = mscclppSocketStateError; + WARN("socketStartConnect: exceeded timeouts (%d)", sock->timedOutRetries); + return mscclppRemoteError; + } + usleep(SLEEP_INT); + return mscclppSuccess; + } else { + char line[SOCKET_NAME_MAXLEN+1]; + sock->state = mscclppSocketStateError; + WARN("socketStartConnect: Connect to %s failed : %s", mscclppSocketToString(&sock->addr, line), strerror(errno)); + return mscclppSystemError; + } +} + +static mscclppResult_t socketPollConnect(struct mscclppSocket* sock) { + struct pollfd pfd; + int timeout = 1, ret; + socklen_t rlen = sizeof(int); + + memset(&pfd, 0, sizeof(struct pollfd)); + pfd.fd = sock->fd; + pfd.events = POLLOUT; + SYSCHECK(ret = poll(&pfd, 1, timeout), "poll"); + if (ret == 0) return mscclppSuccess; + + /* check socket status */ + EQCHECK(ret == 1 && (pfd.revents & POLLOUT), 0); + SYSCHECK(getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, (void*)&ret, &rlen), "getsockopt"); + + if (ret == 0) { + sock->state = mscclppSocketStateConnected; + } else if (ret == ECONNREFUSED) { + if (++sock->refusedRetries == RETRY_REFUSED_TIMES) { + sock->state = mscclppSocketStateError; + WARN("socketPollConnect: exceeded retries (%d)", sock->refusedRetries); + return mscclppRemoteError; + } + if (sock->refusedRetries % 1000 == 0) INFO(MSCCLPP_ALL, "Call to connect returned %s, retrying", strerror(errno)); + usleep(SLEEP_INT); + sock->state = mscclppSocketStateConnecting; + } else if (ret == ETIMEDOUT) { + if (++sock->timedOutRetries == RETRY_TIMEDOUT_TIMES) { + sock->state = mscclppSocketStateError; + WARN("socketPollConnect: exceeded timeouts (%d)", sock->timedOutRetries); + return mscclppRemoteError; + } + usleep(SLEEP_INT); + sock->state = mscclppSocketStateConnecting; + } else if (ret != EINPROGRESS) { + sock->state = mscclppSocketStateError; + return mscclppSystemError; + } + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketPollConnect(struct mscclppSocket* sock) { + if (sock == NULL) { + WARN("mscclppSocketPollConnect: pass NULL socket"); + return mscclppInvalidArgument; + } + MSCCLPPCHECK(socketPollConnect(sock)); + return mscclppSuccess; +} + +static mscclppResult_t socketFinalizeConnect(struct mscclppSocket* sock) { + int sent = 0; + MSCCLPPCHECK(socketProgress(MSCCLPP_SOCKET_SEND, sock, &sock->magic, sizeof(sock->magic), &sent)); + if (sent == 0) return mscclppSuccess; + MSCCLPPCHECK(socketWait(MSCCLPP_SOCKET_SEND, sock, &sock->magic, sizeof(sock->magic), &sent)); + sent = 0; + MSCCLPPCHECK(socketWait(MSCCLPP_SOCKET_SEND, sock, &sock->type, sizeof(sock->type), &sent)); + sock->state = mscclppSocketStateReady; + return mscclppSuccess; +} + +static mscclppResult_t socketProgressState(struct mscclppSocket* sock) { + if (sock->state == mscclppSocketStateAccepting) { + MSCCLPPCHECK(socketTryAccept(sock)); + } + if (sock->state == mscclppSocketStateAccepted) { + MSCCLPPCHECK(socketFinalizeAccept(sock)); + } + if (sock->state == mscclppSocketStateConnecting) { + MSCCLPPCHECK(socketStartConnect(sock)); + } + if (sock->state == mscclppSocketStateConnectPolling) { + MSCCLPPCHECK(socketPollConnect(sock)); + } + if (sock->state == mscclppSocketStateConnected) { + MSCCLPPCHECK(socketFinalizeConnect(sock)); + } + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketReady(struct mscclppSocket* sock, int *running) { + if (sock == NULL) { + *running = 0; + return mscclppSuccess; + } + if (sock->state == mscclppSocketStateError || sock->state == mscclppSocketStateClosed) { + WARN("mscclppSocketReady: unexpected socket state %d", sock->state); + return mscclppRemoteError; + } + *running = (sock->state == mscclppSocketStateReady) ? 1 : 0; + if (*running == 0) { + MSCCLPPCHECK(socketProgressState(sock)); + *running = (sock->state == mscclppSocketStateReady) ? 1 : 0; + } + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketConnect(struct mscclppSocket* sock) { +#ifdef ENABLE_TRACE + char line[SOCKET_NAME_MAXLEN+1]; +#endif + const int one = 1; + + if (sock == NULL) { + WARN("mscclppSocketConnect: pass NULL socket"); + return mscclppInvalidArgument; + } + if (sock->fd == -1) { + WARN("mscclppSocketConnect: file descriptor is -1"); + return mscclppInvalidArgument; + } + + if (sock->state != mscclppSocketStateInitialized) { + WARN("mscclppSocketConnect: wrong socket state %d", sock->state); + if (sock->state == mscclppSocketStateError) return mscclppRemoteError; + return mscclppInternalError; + } + TRACE(MSCCLPP_INIT|MSCCLPP_NET,"Connecting to socket %s", mscclppSocketToString(&sock->addr, line)); + + SYSCHECK(setsockopt(sock->fd, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(int)), "setsockopt"); + + sock->state = mscclppSocketStateConnecting; + do { + MSCCLPPCHECK(socketProgressState(sock)); + } while (sock->asyncFlag == 0 && + (sock->abortFlag == NULL || *sock->abortFlag == 0) && + (sock->state == mscclppSocketStateConnecting || + sock->state == mscclppSocketStateConnectPolling || + sock->state == mscclppSocketStateConnected)); + + if (sock->abortFlag && *sock->abortFlag != 0) return mscclppInternalError; + + switch (sock->state) { + case mscclppSocketStateConnecting: + case mscclppSocketStateConnectPolling: + case mscclppSocketStateConnected: + case mscclppSocketStateReady: + return mscclppSuccess; + case mscclppSocketStateError: + return mscclppSystemError; + default: + WARN("mscclppSocketConnect: wrong socket state %d", sock->state); + return mscclppInternalError; + } +} + +mscclppResult_t mscclppSocketAccept(struct mscclppSocket* sock, struct mscclppSocket* listenSock) { + mscclppResult_t ret = mscclppSuccess; + + if (listenSock == NULL || sock == NULL) { + WARN("mscclppSocketAccept: pass NULL socket"); + ret = mscclppInvalidArgument; + goto exit; + } + if (listenSock->state != mscclppSocketStateReady) { + WARN("mscclppSocketAccept: wrong socket state %d", listenSock->state); + if (listenSock->state == mscclppSocketStateError) + ret = mscclppSystemError; + else + ret = mscclppInternalError; + goto exit; + } + + if (sock->acceptFd == -1) { + memcpy(sock, listenSock, sizeof(struct mscclppSocket)); + sock->acceptFd = listenSock->fd; + sock->state = mscclppSocketStateAccepting; + } + + do { + MSCCLPPCHECKGOTO(socketProgressState(sock), ret, exit); + } while (sock->asyncFlag == 0 && + (sock->abortFlag == NULL || *sock->abortFlag == 0) && + (sock->state == mscclppSocketStateAccepting || + sock->state == mscclppSocketStateAccepted)); + + if (sock->abortFlag && *sock->abortFlag != 0) return mscclppInternalError; + + switch (sock->state) { + case mscclppSocketStateAccepting: + case mscclppSocketStateAccepted: + case mscclppSocketStateReady: + ret = mscclppSuccess; + break; + case mscclppSocketStateError: + ret = mscclppSystemError; + break; + default: + WARN("mscclppSocketAccept: wrong socket state %d", sock->state); + ret = mscclppInternalError; + break; + } + +exit: + return ret; +} + +mscclppResult_t mscclppSocketInit(struct mscclppSocket* sock, union mscclppSocketAddress* addr, uint64_t magic, enum mscclppSocketType type, volatile uint32_t* abortFlag, int asyncFlag) { + mscclppResult_t ret = mscclppSuccess; + + if (sock == NULL) goto exit; + sock->timedOutRetries = 0; + sock->refusedRetries = 0; + sock->abortFlag = abortFlag; + sock->asyncFlag = asyncFlag; + sock->state = mscclppSocketStateInitialized; + sock->magic = magic; + sock->type = type; + sock->fd = -1; + sock->acceptFd = -1; + + if (addr) { + /* IPv4/IPv6 support */ + int family; + memcpy(&sock->addr, addr, sizeof(union mscclppSocketAddress)); + family = sock->addr.sa.sa_family; + if (family != AF_INET && family != AF_INET6) { + char line[SOCKET_NAME_MAXLEN+1]; + WARN("mscclppSocketInit: connecting to address %s with family %d is neither AF_INET(%d) nor AF_INET6(%d)", + mscclppSocketToString(&sock->addr, line), family, AF_INET, AF_INET6); + ret = mscclppInternalError; + goto fail; + } + sock->salen = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + + /* Connect to a hostname / port */ + sock->fd = socket(family, SOCK_STREAM, 0); + if (sock->fd == -1) { + WARN("mscclppSocketInit: Socket creation failed : %s", strerror(errno)); + ret = mscclppSystemError; + goto fail; + } + } else { + memset(&sock->addr, 0, sizeof(union mscclppSocketAddress)); + } + + /* Set socket as non-blocking if async or if we need to be able to abort */ + if ((sock->asyncFlag || sock->abortFlag) && sock->fd >= 0) { + int flags; + EQCHECKGOTO(flags = fcntl(sock->fd, F_GETFL), -1, ret, fail); + SYSCHECKGOTO(fcntl(sock->fd, F_SETFL, flags | O_NONBLOCK), ret, fail); + } + +exit: + return ret; +fail: + goto exit; +} + +mscclppResult_t mscclppSocketProgress(int op, struct mscclppSocket* sock, void* ptr, int size, int* offset) { + if (sock == NULL) { + WARN("mscclppSocketProgress: pass NULL socket"); + return mscclppInvalidArgument; + } + MSCCLPPCHECK(socketProgress(op, sock, ptr, size, offset)); + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketWait(int op, struct mscclppSocket* sock, void* ptr, int size, int* offset) { + if (sock == NULL) { + WARN("mscclppSocketWait: pass NULL socket"); + return mscclppInvalidArgument; + } + MSCCLPPCHECK(socketWait(op, sock, ptr, size, offset)); + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketSend(struct mscclppSocket* sock, void* ptr, int size) { + int offset = 0; + if (sock == NULL) { + WARN("mscclppSocketSend: pass NULL socket"); + return mscclppInvalidArgument; + } + if (sock->state != mscclppSocketStateReady) { + WARN("mscclppSocketSend: socket state (%d) is not ready", sock->state); + return mscclppInternalError; + } + MSCCLPPCHECK(socketWait(MSCCLPP_SOCKET_SEND, sock, ptr, size, &offset)); + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketRecv(struct mscclppSocket* sock, void* ptr, int size) { + int offset = 0; + if (sock == NULL) { + WARN("mscclppSocketRecv: pass NULL socket"); + return mscclppInvalidArgument; + } + if (sock->state != mscclppSocketStateReady) { + WARN("mscclppSocketRecv: socket state (%d) is not ready", sock->state); + return mscclppInternalError; + } + MSCCLPPCHECK(socketWait(MSCCLPP_SOCKET_RECV, sock, ptr, size, &offset)); + return mscclppSuccess; +} + +// Receive or detect connection closed +mscclppResult_t mscclppSocketTryRecv(struct mscclppSocket* sock, void* ptr, int size, int* closed) { + int offset = 0; + if (sock == NULL) { + WARN("mscclppSocketTryRecv: pass NULL socket"); + return mscclppInvalidArgument; + } + *closed = 0; + while (offset < size) { + MSCCLPPCHECK(socketProgressOpt(MSCCLPP_SOCKET_RECV, sock, ptr, size, &offset, 0, closed)); + if (*closed) return mscclppSuccess; + } + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketClose(struct mscclppSocket* sock) { + if (sock != NULL) { + if (sock->fd >= 0) close(sock->fd); + sock->state = mscclppSocketStateClosed; + sock->fd = -1; + } + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketGetFd(struct mscclppSocket* sock, int* fd) { + if (sock == NULL) { + WARN("mscclppSocketGetFd: pass NULL socket"); + return mscclppInvalidArgument; + } + if (fd) *fd = sock->fd; + return mscclppSuccess; +} + +mscclppResult_t mscclppSocketSetFd(int fd, struct mscclppSocket* sock) { + if (sock == NULL) { + WARN("mscclppSocketGetFd: pass NULL socket"); + return mscclppInvalidArgument; + } + sock->fd = fd; + return mscclppSuccess; +} diff --git a/src/utils.cc b/src/utils.cc new file mode 100644 index 00000000..25f4f703 --- /dev/null +++ b/src/utils.cc @@ -0,0 +1,293 @@ +/************************************************************************* + * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "utils.h" +#include "core.h" + +// #include "nvmlwrap.h" + +#include + +// Get current Compute Capability +int mscclppCudaCompCap() { + int cudaDev; + if (cudaGetDevice(&cudaDev) != cudaSuccess) return 0; + int ccMajor, ccMinor; + if (cudaDeviceGetAttribute(&ccMajor, cudaDevAttrComputeCapabilityMajor, cudaDev) != cudaSuccess) return 0; + if (cudaDeviceGetAttribute(&ccMinor, cudaDevAttrComputeCapabilityMinor, cudaDev) != cudaSuccess) return 0; + return ccMajor*10+ccMinor; +} + +mscclppResult_t int64ToBusId(int64_t id, char* busId) { + sprintf(busId, "%04lx:%02lx:%02lx.%01lx", (id) >> 20, (id & 0xff000) >> 12, (id & 0xff0) >> 4, (id & 0xf)); + return mscclppSuccess; +} + +mscclppResult_t busIdToInt64(const char* busId, int64_t* id) { + char hexStr[17]; // Longest possible int64 hex string + null terminator. + int hexOffset = 0; + for (int i = 0; hexOffset < sizeof(hexStr) - 1; i++) { + char c = busId[i]; + if (c == '.' || c == ':') continue; + if ((c >= '0' && c <= '9') || + (c >= 'A' && c <= 'F') || + (c >= 'a' && c <= 'f')) { + hexStr[hexOffset++] = busId[i]; + } else break; + } + hexStr[hexOffset] = '\0'; + *id = strtol(hexStr, NULL, 16); + return mscclppSuccess; +} + +// Convert a logical cudaDev index to the NVML device minor number +mscclppResult_t getBusId(int cudaDev, int64_t *busId) { + // On most systems, the PCI bus ID comes back as in the 0000:00:00.0 + // format. Still need to allocate proper space in case PCI domain goes + // higher. + char busIdStr[] = "00000000:00:00.0"; + CUDACHECK(cudaDeviceGetPCIBusId(busIdStr, sizeof(busIdStr), cudaDev)); + MSCCLPPCHECK(busIdToInt64(busIdStr, busId)); + return mscclppSuccess; +} + +mscclppResult_t getHostName(char* hostname, int maxlen, const char delim) { + if (gethostname(hostname, maxlen) != 0) { + strncpy(hostname, "unknown", maxlen); + return mscclppSystemError; + } + int i = 0; + while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen-1)) i++; + hostname[i] = '\0'; + return mscclppSuccess; +} + +uint64_t getHash(const char* string, int n) { + // Based on DJB2a, result = result * 33 ^ char + uint64_t result = 5381; + for (int c = 0; c < n; c++) { + result = ((result << 5) + result) ^ string[c]; + } + return result; +} + +/* Generate a hash of the unique identifying string for this host + * that will be unique for both bare-metal and container instances + * Equivalent of a hash of; + * + * $(hostname)$(cat /proc/sys/kernel/random/boot_id) + * + * This string can be overridden by using the MSCCLPP_HOSTID env var. + */ +#define HOSTID_FILE "/proc/sys/kernel/random/boot_id" +uint64_t getHostHash(void) { + char hostHash[1024]; + char *hostId; + + // Fall back is the full hostname if something fails + (void) getHostName(hostHash, sizeof(hostHash), '\0'); + int offset = strlen(hostHash); + + if ((hostId = getenv("MSCCLPP_HOSTID")) != NULL) { + INFO(MSCCLPP_ENV, "MSCCLPP_HOSTID set by environment to %s", hostId); + strncpy(hostHash, hostId, sizeof(hostHash)); + } else { + FILE *file = fopen(HOSTID_FILE, "r"); + if (file != NULL) { + char *p; + if (fscanf(file, "%ms", &p) == 1) { + strncpy(hostHash+offset, p, sizeof(hostHash)-offset-1); + free(p); + } + } + fclose(file); + } + + // Make sure the string is terminated + hostHash[sizeof(hostHash)-1]='\0'; + + TRACE(MSCCLPP_INIT,"unique hostname '%s'", hostHash); + + return getHash(hostHash, strlen(hostHash)); +} + +/* Generate a hash of the unique identifying string for this process + * that will be unique for both bare-metal and container instances + * Equivalent of a hash of; + * + * $$ $(readlink /proc/self/ns/pid) + */ +uint64_t getPidHash(void) { + char pname[1024]; + // Start off with our pid ($$) + sprintf(pname, "%ld", (long) getpid()); + int plen = strlen(pname); + int len = readlink("/proc/self/ns/pid", pname+plen, sizeof(pname)-1-plen); + if (len < 0) len = 0; + + pname[plen+len]='\0'; + TRACE(MSCCLPP_INIT,"unique PID '%s'", pname); + + return getHash(pname, strlen(pname)); +} + +int parseStringList(const char* string, struct netIf* ifList, int maxList) { + if (!string) return 0; + + const char* ptr = string; + + int ifNum = 0; + int ifC = 0; + char c; + do { + c = *ptr; + if (c == ':') { + if (ifC > 0) { + ifList[ifNum].prefix[ifC] = '\0'; + ifList[ifNum].port = atoi(ptr+1); + ifNum++; ifC = 0; + } + while (c != ',' && c != '\0') c = *(++ptr); + } else if (c == ',' || c == '\0') { + if (ifC > 0) { + ifList[ifNum].prefix[ifC] = '\0'; + ifList[ifNum].port = -1; + ifNum++; ifC = 0; + } + } else { + ifList[ifNum].prefix[ifC] = c; + ifC++; + } + ptr++; + } while (ifNum < maxList && c); + return ifNum; +} + +static bool matchIf(const char* string, const char* ref, bool matchExact) { + // Make sure to include '\0' in the exact case + int matchLen = matchExact ? strlen(string) + 1 : strlen(ref); + return strncmp(string, ref, matchLen) == 0; +} + +static bool matchPort(const int port1, const int port2) { + if (port1 == -1) return true; + if (port2 == -1) return true; + if (port1 == port2) return true; + return false; +} + + +bool matchIfList(const char* string, int port, struct netIf* ifList, int listSize, bool matchExact) { + // Make an exception for the case where no user list is defined + if (listSize == 0) return true; + + for (int i=0; ihunks` points to the top of the stack non-empty hunks. Hunks above + // this (reachable via `->above`) are empty. + struct Hunk* top = me->topFrame.hunk; + size_t mallocSize = 0; + + // If we have lots of space left in hunk but that wasn't enough then we'll + // allocate the object unhunked. + if (me->topFrame.end - me->topFrame.bumper >= 8<<10) + goto unhunked; + + // If we have another hunk (which must be empty) waiting above this one and + // the object fits then use that. + if (top && top->above) { + struct Hunk* top1 = top->above; + uintptr_t uobj = (reinterpret_cast(top1) + sizeof(struct Hunk) + align-1) & -uintptr_t(align); + if (uobj + size <= reinterpret_cast(top1) + top1->size) { + me->topFrame.hunk = top1; + me->topFrame.bumper = uobj + size; + me->topFrame.end = reinterpret_cast(top1) + top1->size; + return reinterpret_cast(uobj); + } + } + + { // If the next hunk we're going to allocate wouldn't be big enough but the + // Unhunk proxy fits in the current hunk then go allocate as unhunked. + size_t nextSize = (top ? top->size : 0) + (64<<10); + constexpr size_t maxAlign = 64; + if (nextSize < sizeof(struct Hunk) + maxAlign + size) { + uintptr_t uproxy = (me->topFrame.bumper + alignof(Unhunk)-1) & -uintptr_t(alignof(Unhunk)); + if (uproxy + sizeof(struct Unhunk) <= me->topFrame.end) + goto unhunked; + } + + // At this point we must need another hunk, either to fit the object + // itself or its Unhunk proxy. + mallocSize = nextSize; + INFO(MSCCLPP_ALLOC, "%s:%d memory stack hunk malloc(%llu)", __FILE__, __LINE__, (unsigned long long)mallocSize); + struct Hunk *top1 = (struct Hunk*)malloc(mallocSize); + if (top1 == nullptr) goto malloc_exhausted; + top1->size = nextSize; + top1->above = nullptr; + if (top) top->above = top1; + top = top1; + me->topFrame.hunk = top; + me->topFrame.end = reinterpret_cast(top) + nextSize; + me->topFrame.bumper = reinterpret_cast(top) + sizeof(struct Hunk); + } + + { // Try to fit object in the new top hunk. + uintptr_t uobj = (me->topFrame.bumper + align-1) & -uintptr_t(align); + if (uobj + size <= me->topFrame.end) { + me->topFrame.bumper = uobj + size; + return reinterpret_cast(uobj); + } + } + +unhunked: + { // We need to allocate the object out-of-band and put an Unhunk proxy in-band + // to keep track of it. + uintptr_t uproxy = (me->topFrame.bumper + alignof(Unhunk)-1) & -uintptr_t(alignof(Unhunk)); + Unhunk* proxy = reinterpret_cast(uproxy); + me->topFrame.bumper = uproxy + sizeof(Unhunk); + proxy->next = me->topFrame.unhunks; + me->topFrame.unhunks = proxy; + mallocSize = size; + proxy->obj = malloc(mallocSize); + INFO(MSCCLPP_ALLOC, "%s:%d memory stack non-hunk malloc(%llu)", __FILE__, __LINE__, (unsigned long long)mallocSize); + if (proxy->obj == nullptr) goto malloc_exhausted; + return proxy->obj; + } + +malloc_exhausted: + WARN("%s:%d Unrecoverable error detected: malloc(size=%llu) returned null.", __FILE__, __LINE__, (unsigned long long)mallocSize); + abort(); +} + +void mscclppMemoryStackDestruct(struct mscclppMemoryStack* me) { + // Free unhunks first because both the frames and unhunk proxies lie within the hunks. + struct mscclppMemoryStack::Frame* f = &me->topFrame; + while (f != nullptr) { + struct mscclppMemoryStack::Unhunk* u = f->unhunks; + while (u != nullptr) { + free(u->obj); + u = u->next; + } + f = f->below; + } + // Free hunks + struct mscclppMemoryStack::Hunk* h = me->stub.above; + while (h != nullptr) { + struct mscclppMemoryStack::Hunk *h1 = h->above; + free(h); + h = h1; + } +}