diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5e583d45..dc86f638 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,2 @@ -file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cc *.h) -file(GLOB to_remove gdr.cc) -list(REMOVE_ITEM SOURCES ${to_remove}) - +file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cc) target_sources(mscclpp PRIVATE ${SOURCES}) diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index b6311948..d3020030 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -1,8 +1,8 @@ -#include "bootstrap.h" #include "api.h" #include "checks.hpp" #include #include "utils.h" +#include "socket.h" #include #include @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -17,17 +18,6 @@ using namespace mscclpp; namespace { -uint64_t hashUniqueId(const mscclppBootstrapHandle& id) -{ - const char* bytes = (const char*)&id; - uint64_t h = 0xdeadbeef; - for (int i = 0; i < (int)sizeof(mscclppBootstrapHandle); i++) { - h ^= h >> 32; - h *= 0x8db3db47fa2994ad; - h += bytes[i]; - } - return h; -} mscclppResult_t setFilesLimit() { @@ -515,551 +505,3 @@ MSCCLPP_API_CPP Bootstrap::~Bootstrap() { pimpl_->close(); } - -// ------------------- Old bootstrap functions ------------------- -struct BootstrapRootArgs -{ - struct mscclppSocket* listenSock; - uint64_t magic; -}; - -/* Init functions */ -static char bootstrapNetIfName[MAX_IF_NAME_SIZE + 1]; -static union mscclppSocketAddress bootstrapNetIfAddr; -static int bootstrapNetInitDone = 0; -pthread_mutex_t bootstrapNetLock = PTHREAD_MUTEX_INITIALIZER; - -mscclppResult_t bootstrapNetInit(const char* ip_port_pair) -{ - if (bootstrapNetInitDone == 0) { - pthread_mutex_lock(&bootstrapNetLock); - if (bootstrapNetInitDone == 0) { - const char* env; - if (ip_port_pair) { - env = ip_port_pair; - } else { - env = getenv("MSCCLPP_COMM_ID"); - } - if (env) { - union mscclppSocketAddress remoteAddr; - if (mscclppSocketGetAddrFromString(&remoteAddr, env) != mscclppSuccess) { - WARN("Invalid MSCCLPP_COMM_ID, please use format: : or []: or :"); - return mscclppInvalidArgument; - } - if (mscclppFindInterfaceMatchSubnet(bootstrapNetIfName, &bootstrapNetIfAddr, &remoteAddr, MAX_IF_NAME_SIZE, - 1) <= 0) { - WARN("NET/Socket : No usable listening interface found"); - return mscclppSystemError; - } - } else { - int nIfs = mscclppFindInterfaces(bootstrapNetIfName, &bootstrapNetIfAddr, MAX_IF_NAME_SIZE, 1); - if (nIfs <= 0) { - WARN("Bootstrap : no socket interface found"); - return mscclppInternalError; - } - } - char line[SOCKET_NAME_MAXLEN + MAX_IF_NAME_SIZE + 2]; - sprintf(line, " %s:", bootstrapNetIfName); - mscclppSocketToString(&bootstrapNetIfAddr, line + strlen(line)); - INFO(MSCCLPP_INIT, "Bootstrap : Using%s", line); - bootstrapNetInitDone = 1; - } - pthread_mutex_unlock(&bootstrapNetLock); - } - return mscclppSuccess; -} - -// Additional sync functions -static mscclppResult_t bootstrapNetSend(struct mscclppSocket* sock, void* data, int size) -{ - MSCCLPPCHECK(mscclppSocketSend(sock, &size, sizeof(int))); - MSCCLPPCHECK(mscclppSocketSend(sock, data, size)); - return mscclppSuccess; -} -static mscclppResult_t bootstrapNetRecv(struct mscclppSocket* sock, void* data, int size) -{ - int recvSize; - MSCCLPPCHECK(mscclppSocketRecv(sock, &recvSize, sizeof(int))); - if (recvSize > size) { - WARN("Message truncated : received %d bytes instead of %d", recvSize, size); - return mscclppInternalError; - } - MSCCLPPCHECK(mscclppSocketRecv(sock, data, std::min(recvSize, size))); - return mscclppSuccess; -} - -// struct ExtInfo -// { -// int rank; -// int nranks; -// union mscclppSocketAddress extAddressListenRoot; -// union mscclppSocketAddress extAddressListen; -// }; - -#include - -// static mscclppResult_t setFilesLimit() -// { -// struct rlimit filesLimit; -// SYSCHECK(getrlimit(RLIMIT_NOFILE, &filesLimit), "getrlimit"); -// filesLimit.rlim_cur = filesLimit.rlim_max; -// SYSCHECK(setrlimit(RLIMIT_NOFILE, &filesLimit), "setrlimit"); -// return mscclppSuccess; -// } - -static void* bootstrapRoot(void* rargs) -{ - struct BootstrapRootArgs* args = (struct BootstrapRootArgs*)rargs; - struct mscclppSocket* listenSock = args->listenSock; - uint64_t magic = args->magic; - mscclppResult_t res = mscclppSuccess; - int nranks = 0, c = 0; - struct ExtInfo info; - union mscclppSocketAddress* rankAddresses = NULL; - union mscclppSocketAddress* rankAddressesRoot = NULL; // for initial rank <-> root information exchange - union mscclppSocketAddress* zero = NULL; - MSCCLPPCHECKGOTO(mscclppCalloc(&zero, 1), res, out); - setFilesLimit(); - - TRACE(MSCCLPP_INIT, "BEGIN"); - /* Receive addresses from all ranks */ - do { - struct mscclppSocket sock; - MSCCLPPCHECKGOTO(mscclppSocketInit(&sock), res, out); - MSCCLPPCHECKGOTO(mscclppSocketAccept(&sock, listenSock), res, out); - MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, &info, sizeof(info)), res, out); - MSCCLPPCHECKGOTO(mscclppSocketClose(&sock), res, out); - - if (c == 0) { - nranks = info.nRanks; - MSCCLPPCHECKGOTO(mscclppCalloc(&rankAddresses, nranks), res, out); - MSCCLPPCHECKGOTO(mscclppCalloc(&rankAddressesRoot, nranks), res, out); - } - - if (nranks != info.nRanks) { - WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nranks, info.nRanks); - goto out; - } - - if (memcmp(zero, &rankAddressesRoot[info.rank], sizeof(union mscclppSocketAddress)) != 0) { - WARN("Bootstrap Root : rank %d of %d ranks has already checked in", info.rank, nranks); - goto out; - } - - // Save the connection handle for that rank - memcpy(rankAddressesRoot + info.rank, &info.extAddressListenRoot, sizeof(union mscclppSocketAddress)); - memcpy(rankAddresses + info.rank, &info.extAddressListen, sizeof(union mscclppSocketAddress)); - - ++c; - TRACE(MSCCLPP_INIT, "Received connect from rank %d total %d/%d", info.rank, c, nranks); - } while (c < nranks); - TRACE(MSCCLPP_INIT, "COLLECTED ALL %d HANDLES", nranks); - - // Send the connect handle for the next rank in the AllGather ring - for (int r = 0; r < nranks; ++r) { - int next = (r + 1) % nranks; - struct mscclppSocket sock; - MSCCLPPCHECKGOTO(mscclppSocketInit(&sock, rankAddressesRoot + r, magic, mscclppSocketTypeBootstrap), res, out); - MSCCLPPCHECKGOTO(mscclppSocketConnect(&sock), res, out); - MSCCLPPCHECKGOTO(bootstrapNetSend(&sock, rankAddresses + next, sizeof(union mscclppSocketAddress)), res, out); - MSCCLPPCHECKGOTO(mscclppSocketClose(&sock), res, out); - } - TRACE(MSCCLPP_INIT, "SENT OUT ALL %d HANDLES", nranks); - -out: - if (listenSock != NULL) { - mscclppSocketClose(listenSock); - free(listenSock); - } - if (rankAddresses) - free(rankAddresses); - if (rankAddressesRoot) - free(rankAddressesRoot); - if (zero) - free(zero); - free(rargs); - - TRACE(MSCCLPP_INIT, "DONE"); - return NULL; -} - -mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle) -{ - struct mscclppSocket* listenSock; - struct BootstrapRootArgs* args; - pthread_t thread; - - MSCCLPPCHECK(mscclppCalloc(&listenSock, 1)); - MSCCLPPCHECK(mscclppSocketInit(listenSock, &handle->addr, handle->magic, mscclppSocketTypeBootstrap, NULL, 0)); - MSCCLPPCHECK(mscclppSocketListen(listenSock)); - MSCCLPPCHECK(mscclppSocketGetAddr(listenSock, &handle->addr)); - - MSCCLPPCHECK(mscclppCalloc(&args, 1)); - args->listenSock = listenSock; - args->magic = handle->magic; - NEQCHECK(pthread_create(&thread, NULL, bootstrapRoot, (void*)args), 0); - mscclppSetThreadName(thread, "MSCCLPP BootstrapR"); - NEQCHECK(pthread_detach(thread), 0); // will not be pthread_join()'d - return mscclppSuccess; -} - -// #include -// #include - -mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool isRoot, const char* ip_port_pair) -{ - memset(handle, 0, sizeof(mscclppBootstrapHandle)); - const char* env = NULL; - - if (ip_port_pair) { - env = ip_port_pair; - } else { - env = getenv("MSCCLPP_COMM_ID"); - } - if (env) { - handle->magic = 0xdeadbeef; - - INFO(MSCCLPP_ENV, "MSCCLPP_COMM_ID set by environment to %s", env); - if (mscclppSocketGetAddrFromString(&handle->addr, env) != mscclppSuccess) { - WARN("Invalid MSCCLPP_COMM_ID, please use format: : or []: or :"); - return mscclppInvalidArgument; - } - if (isRoot) - MSCCLPPCHECK(bootstrapCreateRoot(handle)); - } else { - MSCCLPPCHECK(getRandomData(&handle->magic, sizeof(handle->magic))); - memcpy(&handle->addr, &bootstrapNetIfAddr, sizeof(union mscclppSocketAddress)); - MSCCLPPCHECK(bootstrapCreateRoot(handle)); - } - printf("addr = %s port = %d\n", inet_ntoa(handle->addr.sin.sin_addr), (int)ntohs(handle->addr.sin.sin_port)); - // printf("addr = %s\n", inet_ntoa((*(struct sockaddr_in*)&handle->addr.sa).sin_addr)); - - return mscclppSuccess; -} - -struct UnexConn -{ - int peer; - int tag; - struct mscclppSocket sock; - struct UnexConn* next; -}; - -struct BootstrapState -{ - struct mscclppSocket listenSock; - struct mscclppSocket ringRecvSocket; - struct mscclppSocket ringSendSocket; - union mscclppSocketAddress* peerCommAddresses; - union mscclppSocketAddress* peerProxyAddresses; - struct UnexConn* unexpectedConnections; - int cudaDev; - int rank; - int nranks; - uint64_t magic; - volatile uint32_t* abortFlag; -}; - -mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscclppComm* comm) -{ - int rank = comm->rank; - int nranks = comm->nRanks; - struct BootstrapState* state; - struct mscclppSocket* proxySocket; - mscclppSocketAddress nextAddr; - struct mscclppSocket sock, listenSockRoot; - struct ExtInfo info; - - MSCCLPPCHECK(mscclppCalloc(&state, 1)); - state->rank = rank; - state->nranks = nranks; - state->abortFlag = comm->abortFlag; - comm->bootstrap = state; - comm->magic = state->magic = handle->magic; - - TRACE(MSCCLPP_INIT, "rank %d nranks %d", rank, nranks); - - info.rank = rank; - info.nRanks = nranks; - - // Create socket for other ranks to contact me - MSCCLPPCHECK(mscclppSocketInit(&state->listenSock, &bootstrapNetIfAddr, comm->magic, mscclppSocketTypeBootstrap, - comm->abortFlag)); - MSCCLPPCHECK(mscclppSocketListen(&state->listenSock)); - MSCCLPPCHECK(mscclppSocketGetAddr(&state->listenSock, &info.extAddressListen)); - - // Create socket for root to contact me - MSCCLPPCHECK( - mscclppSocketInit(&listenSockRoot, &bootstrapNetIfAddr, comm->magic, mscclppSocketTypeBootstrap, comm->abortFlag)); - MSCCLPPCHECK(mscclppSocketListen(&listenSockRoot)); - MSCCLPPCHECK(mscclppSocketGetAddr(&listenSockRoot, &info.extAddressListenRoot)); - - // stagger connection times to avoid an overload of the root - if (nranks > 128) { - long msec = rank; - struct timespec tv; - tv.tv_sec = msec / 1000; - tv.tv_nsec = 1000000 * (msec % 1000); - TRACE(MSCCLPP_INIT, "rank %d delaying connection to root by %ld msec", rank, msec); - (void)nanosleep(&tv, NULL); - } - - // send info on my listening socket to root - MSCCLPPCHECK(mscclppSocketInit(&sock, &handle->addr, comm->magic, mscclppSocketTypeBootstrap, comm->abortFlag)); - MSCCLPPCHECK(mscclppSocketConnect(&sock)); - MSCCLPPCHECK(bootstrapNetSend(&sock, &info, sizeof(info))); - MSCCLPPCHECK(mscclppSocketClose(&sock)); - - // get info on my "next" rank in the bootstrap ring from root - MSCCLPPCHECK(mscclppSocketInit(&sock)); - MSCCLPPCHECK(mscclppSocketAccept(&sock, &listenSockRoot)); - MSCCLPPCHECK(bootstrapNetRecv(&sock, &nextAddr, sizeof(union mscclppSocketAddress))); - MSCCLPPCHECK(mscclppSocketClose(&sock)); - MSCCLPPCHECK(mscclppSocketClose(&listenSockRoot)); - - MSCCLPPCHECK( - mscclppSocketInit(&state->ringSendSocket, &nextAddr, comm->magic, mscclppSocketTypeBootstrap, comm->abortFlag)); - MSCCLPPCHECK(mscclppSocketConnect(&state->ringSendSocket)); - // Accept the connect request from the previous rank in the AllGather ring - MSCCLPPCHECK(mscclppSocketInit(&state->ringRecvSocket)); - MSCCLPPCHECK(mscclppSocketAccept(&state->ringRecvSocket, &state->listenSock)); - - // AllGather all listen handlers - MSCCLPPCHECK(mscclppCalloc(&state->peerCommAddresses, nranks)); - MSCCLPPCHECK(mscclppSocketGetAddr(&state->listenSock, state->peerCommAddresses + rank)); - MSCCLPPCHECK(bootstrapAllGather(state, state->peerCommAddresses, sizeof(union mscclppSocketAddress))); - - // Create the service proxy - MSCCLPPCHECK(mscclppCalloc(&state->peerProxyAddresses, nranks)); - - // proxy is aborted through a message; don't set abortFlag - MSCCLPPCHECK(mscclppCalloc(&proxySocket, 1)); - MSCCLPPCHECK( - mscclppSocketInit(proxySocket, &bootstrapNetIfAddr, comm->magic, mscclppSocketTypeProxy, comm->abortFlag)); - MSCCLPPCHECK(mscclppSocketListen(proxySocket)); - MSCCLPPCHECK(mscclppSocketGetAddr(proxySocket, state->peerProxyAddresses + rank)); - MSCCLPPCHECK(bootstrapAllGather(state, state->peerProxyAddresses, sizeof(union mscclppSocketAddress))); - // MSCCLPPCHECK(mscclppProxyInit(comm, proxySocket, state->peerProxyAddresses)); - - TRACE(MSCCLPP_INIT, "rank %d nranks %d - DONE", rank, nranks); - - return mscclppSuccess; -} - -mscclppResult_t bootstrapAllGather(void* commState, void* allData, int size) -{ - struct BootstrapState* state = (struct BootstrapState*)commState; - char* data = (char*)allData; - int rank = state->rank; - int nranks = state->nranks; - - TRACE(MSCCLPP_INIT, "rank %d nranks %d size %d", rank, nranks, size); - - /* Simple ring based AllGather - * At each step i receive data from (rank-i-1) from left - * and send previous step's data from (rank-i) to right - */ - for (int i = 0; i < nranks - 1; i++) { - size_t rslice = (rank - i - 1 + nranks) % nranks; - size_t sslice = (rank - i + nranks) % nranks; - - // Send slice to the right - MSCCLPPCHECK(bootstrapNetSend(&state->ringSendSocket, data + sslice * size, size)); - // Recv slice from the left - MSCCLPPCHECK(bootstrapNetRecv(&state->ringRecvSocket, data + rslice * size, size)); - } - - TRACE(MSCCLPP_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size); - return mscclppSuccess; -} - -mscclppResult_t bootstrapSend(void* commState, int peer, int tag, void* data, int size) -{ - mscclppResult_t ret = mscclppSuccess; - struct BootstrapState* state = (struct BootstrapState*)commState; - struct mscclppSocket sock; - - MSCCLPPCHECKGOTO(mscclppSocketInit(&sock, state->peerCommAddresses + peer, state->magic, mscclppSocketTypeBootstrap, - state->abortFlag), - ret, fail); - MSCCLPPCHECKGOTO(mscclppSocketConnect(&sock), ret, fail); - MSCCLPPCHECKGOTO(bootstrapNetSend(&sock, &state->rank, sizeof(int)), ret, fail); - MSCCLPPCHECKGOTO(bootstrapNetSend(&sock, &tag, sizeof(int)), ret, fail); - MSCCLPPCHECKGOTO(bootstrapNetSend(&sock, data, size), ret, fail); - -exit: - MSCCLPPCHECK(mscclppSocketClose(&sock)); - return ret; -fail: - goto exit; -} - -mscclppResult_t bootstrapBarrier(void* commState, int* ranks, int rank, int nranks, int tag) -{ - if (nranks == 1) - return mscclppSuccess; - TRACE(MSCCLPP_INIT, "rank %d nranks %d tag %x - ENTER", rank, nranks, tag); - - /* Simple intra process barrier - * - * Based on the dissemination algorithm by Debra Hensgen, Raphael Finkel, and Udi Manbet, - * "Two Algorithms for Barrier Synchronization," International Journal of Parallel Programming, 17(1):1-17, 1988" - */ - int data[1]; - for (int mask = 1; mask < nranks; mask <<= 1) { - int src = (rank - mask + nranks) % nranks; - int dst = (rank + mask) % nranks; - MSCCLPPCHECK(bootstrapSend(commState, ranks[dst], tag, data, sizeof(data))); - MSCCLPPCHECK(bootstrapRecv(commState, ranks[src], tag, data, sizeof(data))); - } - - TRACE(MSCCLPP_INIT, "rank %d nranks %d tag %x - DONE", rank, nranks, tag); - return mscclppSuccess; -} - -mscclppResult_t bootstrapIntraNodeAllGather(void* commState, int* ranks, int rank, int nranks, void* allData, int size) -{ - if (nranks == 1) - return mscclppSuccess; - char* data = (char*)allData; - TRACE(MSCCLPP_INIT, "rank %d nranks %d size %d - ENTER", rank, nranks, size); - - for (int i = 1; i < nranks; i++) { - int src = (rank - i + nranks) % nranks; - int dst = (rank + i) % nranks; - MSCCLPPCHECK(bootstrapSend(commState, ranks[dst], /*tag=*/i, data + rank * size, size)); - MSCCLPPCHECK(bootstrapRecv(commState, ranks[src], /*tag=*/i, data + src * size, size)); - } - - TRACE(MSCCLPP_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size); - return mscclppSuccess; -} - -mscclppResult_t unexpectedEnqueue(struct BootstrapState* state, int peer, int tag, struct mscclppSocket* sock) -{ - // New unex - struct UnexConn* unex; - MSCCLPPCHECK(mscclppCalloc(&unex, 1)); - unex->peer = peer; - unex->tag = tag; - memcpy(&unex->sock, sock, sizeof(struct mscclppSocket)); - - // Enqueue - struct UnexConn* list = state->unexpectedConnections; - if (list == NULL) { - state->unexpectedConnections = unex; - return mscclppSuccess; - } - while (list->next) - list = list->next; - list->next = unex; - return mscclppSuccess; -} - -mscclppResult_t unexpectedDequeue(struct BootstrapState* state, int peer, int tag, struct mscclppSocket* sock, - int* found) -{ - struct UnexConn* elem = state->unexpectedConnections; - struct UnexConn* prev = NULL; - *found = 0; - while (elem) { - if (elem->peer == peer && elem->tag == tag) { - if (prev == NULL) { - state->unexpectedConnections = elem->next; - } else { - prev->next = elem->next; - } - memcpy(sock, &elem->sock, sizeof(struct mscclppSocket)); - free(elem); - *found = 1; - return mscclppSuccess; - } - prev = elem; - elem = elem->next; - } - return mscclppSuccess; -} - -static void unexpectedFree(struct BootstrapState* state) -{ - struct UnexConn* elem = state->unexpectedConnections; - struct UnexConn* prev = NULL; - - while (elem) { - prev = elem; - elem = elem->next; - free(prev); - } - return; -} - -// We can't know who we'll receive from, so we need to receive everything at once -mscclppResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size) -{ - mscclppResult_t ret = mscclppSuccess; - struct BootstrapState* state = (struct BootstrapState*)commState; - struct mscclppSocket sock; - int newPeer, newTag; - - // Search unexpected connections first - int found; - MSCCLPPCHECK(unexpectedDequeue(state, peer, tag, &sock, &found)); - if (found) { - MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, ((char*)data), size), ret, fail); - goto exit; - } - - // Then look for new connections - while (1) { - MSCCLPPCHECKGOTO(mscclppSocketInit(&sock), ret, fail); - MSCCLPPCHECKGOTO(mscclppSocketAccept(&sock, &state->listenSock), ret, fail); - MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, &newPeer, sizeof(int)), ret, fail); - MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, &newTag, sizeof(int)), ret, fail); - if (newPeer == peer && newTag == tag) { - MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, ((char*)data), size), ret, fail); - goto exit; - } - // Unexpected connection. Save for later. - MSCCLPPCHECKGOTO(unexpectedEnqueue(state, newPeer, newTag, &sock), ret, fail); - } -exit: - MSCCLPPCHECK(mscclppSocketClose(&sock)); - return ret; -fail: - goto exit; -} - -mscclppResult_t bootstrapClose(void* commState) -{ - struct BootstrapState* state = (struct BootstrapState*)commState; - if (state->unexpectedConnections != NULL) { - unexpectedFree(state); - if (*state->abortFlag == 0) { - WARN("Unexpected connections are not empty"); - return mscclppInternalError; - } - } - - MSCCLPPCHECK(mscclppSocketClose(&state->listenSock)); - MSCCLPPCHECK(mscclppSocketClose(&state->ringSendSocket)); - MSCCLPPCHECK(mscclppSocketClose(&state->ringRecvSocket)); - - free(state->peerCommAddresses); - free(state); - - return mscclppSuccess; -} - -mscclppResult_t bootstrapAbort(void* commState) -{ - struct BootstrapState* state = (struct BootstrapState*)commState; - if (commState == NULL) - return mscclppSuccess; - MSCCLPPCHECK(mscclppSocketClose(&state->listenSock)); - MSCCLPPCHECK(mscclppSocketClose(&state->ringSendSocket)); - MSCCLPPCHECK(mscclppSocketClose(&state->ringRecvSocket)); - free(state->peerCommAddresses); - free(state->peerProxyAddresses); - free(state); - return mscclppSuccess; -} diff --git a/src/c_style_remnants.cc b/src/c_style_remnants.cc new file mode 100644 index 00000000..613ff7ee --- /dev/null +++ b/src/c_style_remnants.cc @@ -0,0 +1,45 @@ +#include "mscclpp.h" +#include "debug.h" +#include "config.h" +#include "api.h" + +MSCCLPP_API void mscclppDefaultLogHandler(const char* msg) +{ + mscclppDebugDefaultLogHandler(msg); +} + +MSCCLPP_API mscclppResult_t mscclppSetLogHandler(mscclppLogHandler_t handler) +{ + return mscclppDebugSetLogHandler(handler); +} + +MSCCLPP_API mscclppResult_t mscclppSetBootstrapConnTimeout(int timeout) +{ + mscclppConfig* config = mscclppConfig::getInstance(); + config->setBootstrapConnectionTimeoutConfig(timeout); + return mscclppSuccess; +} + +MSCCLPP_API const char* mscclppGetErrorString(mscclppResult_t code) +{ + switch (code) { + case mscclppSuccess: + return "no error"; + case mscclppUnhandledCudaError: + return "unhandled cuda error"; + case mscclppSystemError: + return "unhandled system error"; + case mscclppInternalError: + return "internal error"; + case mscclppInvalidArgument: + return "invalid argument"; + case mscclppInvalidUsage: + return "invalid usage"; + case mscclppRemoteError: + return "remote process exited or there was a network error"; + case mscclppInProgress: + return "MSCCL++ operation in progress"; + default: + return "unknown result code"; + } +} diff --git a/src/communicator.cc b/src/communicator.cc index 1d670fa6..8b721232 100644 --- a/src/communicator.cc +++ b/src/communicator.cc @@ -2,7 +2,6 @@ #include "api.h" #include "checks.hpp" -#include "comm.h" #include "communicator.hpp" #include "connection.hpp" #include "debug.h" diff --git a/src/gdr.cc b/src/gdr.cc deleted file mode 100644 index 95cd6870..00000000 --- a/src/gdr.cc +++ /dev/null @@ -1,75 +0,0 @@ -#include "gdr.h" - -// Used to make the GDR library calls thread safe -pthread_mutex_t gdrLock = PTHREAD_MUTEX_INITIALIZER; - -gdr_t wrap_gdr_open(void) -{ - return gdr_open(); -} - -mscclppResult_t wrap_gdr_close(gdr_t g) -{ - int ret = gdr_close(g); - if (ret != 0) { - WARN("gdr_close() failed: %d", ret); - return mscclppSystemError; - } - return mscclppSuccess; -} - -mscclppResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, - gdr_mh_t* handle) -{ - int ret; - GDRLOCKCALL(gdr_pin_buffer(g, addr, size, p2p_token, va_space, handle), ret); - if (ret != 0) { - WARN("gdr_pin_buffer(addr %lx, size %zi) failed: %d", addr, size, ret); - return mscclppSystemError; - } - return mscclppSuccess; -} - -mscclppResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle) -{ - int ret; - GDRLOCKCALL(gdr_unpin_buffer(g, handle), ret); - if (ret != 0) { - WARN("gdr_unpin_buffer(handle %lx) failed: %d", handle.h, ret); - return mscclppSystemError; - } - return mscclppSuccess; -} - -mscclppResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t* info) -{ - int ret; - GDRLOCKCALL(gdr_get_info(g, handle, info), ret); - if (ret != 0) { - WARN("gdr_get_info(handle %lx) failed: %d", handle.h, ret); - return mscclppSystemError; - } - return mscclppSuccess; -} - -mscclppResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void** va, size_t size) -{ - int ret; - GDRLOCKCALL(gdr_map(g, handle, va, size), ret); - if (ret != 0) { - WARN("gdr_map(handle %lx, size %zi) failed: %d", handle.h, size, ret); - return mscclppSystemError; - } - return mscclppSuccess; -} - -mscclppResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void* va, size_t size) -{ - int ret; - GDRLOCKCALL(gdr_unmap(g, handle, va, size), ret); - if (ret != 0) { - WARN("gdr_unmap(handle %lx, va %p, size %zi) failed: %d", handle.h, va, size, ret); - return mscclppSystemError; - } - return mscclppSuccess; -} diff --git a/src/ib.cc b/src/ib.cc index e6c91eb3..32db71bb 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -8,13 +8,14 @@ #include "alloc.h" #include "api.h" #include "checks.hpp" -#include "comm.h" #include "debug.h" #include "ib.hpp" #include #include #include +#define MAXCONNECTIONS 64 + namespace mscclpp { IbMr::IbMr(void* pd, void* buff, std::size_t size) : buff(buff) diff --git a/src/include/bootstrap.h b/src/include/bootstrap.h deleted file mode 100644 index 6bb20f81..00000000 --- a/src/include/bootstrap.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once - -#include "mscclpp.h" -#include "socket.h" - -#include "comm.h" - -// ------------------- Old bootstrap headers: to be removed ------------------- - -struct mscclppBootstrapHandle -{ - uint64_t magic; - union mscclppSocketAddress addr; -}; -mscclppResult_t bootstrapNetInit(const char* ip_port_pair = NULL); -mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle); -mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool isRoot = true, - const char* ip_port_pair = NULL); -mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscclppComm* comm); -mscclppResult_t bootstrapAllGather(void* commState, void* allData, int size); -mscclppResult_t bootstrapSend(void* commState, int peer, int tag, void* data, int size); -mscclppResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size); -mscclppResult_t bootstrapBarrier(void* commState, int* ranks, int rank, int nranks, int tag); -mscclppResult_t bootstrapIntraNodeAllGather(void* commState, int* ranks, int rank, int nranks, void* allData, int size); -mscclppResult_t bootstrapClose(void* commState); -mscclppResult_t bootstrapAbort(void* commState); diff --git a/src/include/comm.h b/src/include/comm.h deleted file mode 100644 index e6a067d6..00000000 --- a/src/include/comm.h +++ /dev/null @@ -1,65 +0,0 @@ -/************************************************************************* - * Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#ifndef MSCCLPP_COMM_H_ -#define MSCCLPP_COMM_H_ - -#include "ib.hpp" -#include "proxy.h" -#include -#include - -#define MAXCONNECTIONS 64 - -struct mscclppBufferRegistration -{ - void* data; - uint64_t size; -}; - -struct mscclppConn -{ - int connId; - mscclppTransport_t transport; - int remoteRank; - uint64_t buffSize; - struct mscclppDevConn* devConn; - struct mscclppHostConn* hostConn; - - std::vector bufferRegistrations; - std::vector remoteBufferRegistrations; - - mscclpp::IbCtx* ibCtx; -#if defined(ENABLE_NPKIT) - std::vector npkitUsedReqIds; - std::vector npkitFreeReqIds; -#endif -}; - -struct mscclppComm -{ - struct mscclppConn conns[MAXCONNECTIONS]; - struct mscclppDevConn devConns[MAXCONNECTIONS]; - int nConns; - - void* bootstrap; - - // Magic number for all network communication. Not a security key -- only goal is to detect mismatches. - uint64_t magic; - - int rank; // my rank in the communicator - int nRanks; // number of GPUs in communicator - int cudaDev; // my cuda device index - int devNumaNode; // my device's NUMA node - - // Flag to ask MSCCLPP kernels to abort - volatile uint32_t* abortFlag; - - std::unique_ptr ibContext[MSCCLPP_IB_MAX_DEVS]; - struct mscclppProxyState* proxyState[MSCCLPP_PROXY_MAX_NUM]; -}; - -#endif diff --git a/src/include/gdr.h b/src/include/gdr.h deleted file mode 100644 index d7e0269a..00000000 --- a/src/include/gdr.h +++ /dev/null @@ -1,156 +0,0 @@ -#ifndef MSCCLPP_GDR_H_ -#define MSCCLPP_GDR_H_ - -#include "align.h" -#include "alloc.h" -#include "checks.h" -#include "debug.h" -#include "gdrapi.h" - -// These can be used if the GDR library isn't thread safe -#include -extern pthread_mutex_t gdrLock; -#define GDRLOCK() pthread_mutex_lock(&gdrLock) -#define GDRUNLOCK() pthread_mutex_unlock(&gdrLock) -#define GDRLOCKCALL(cmd, ret) \ - do { \ - GDRLOCK(); \ - ret = cmd; \ - GDRUNLOCK(); \ - } while (false) - -#define GDRCHECK(cmd) \ - do { \ - int e; \ - /* GDRLOCKCALL(cmd, e); */ \ - e = cmd; \ - if (e != 0) { \ - WARN("GDRCOPY failure %d", e); \ - return mscclppSystemError; \ - } \ - } while (false) - -gdr_t wrap_gdr_open(void); -mscclppResult_t wrap_gdr_close(gdr_t g); -mscclppResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, - gdr_mh_t* handle); -mscclppResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle); -mscclppResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t* info); -mscclppResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void** va, size_t size); -mscclppResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void* va, size_t size); - -// Global GDR driver handle -extern gdr_t mscclppGdrCopy; - -typedef struct gdr_mem_desc -{ - void* gdrDevMem; - void* gdrMap; - size_t gdrOffset; - size_t gdrMapSize; - gdr_mh_t gdrMh; -} gdr_mem_desc_t; - -static gdr_t mscclppGdrInit() -{ - // int libMajor, libMinor, drvMajor, drvMinor; - gdr_t handle = wrap_gdr_open(); - - // if (handle != NULL) { - // mscclppResult_t res; - - // // Query the version of libgdrapi - // MSCCLPPCHECKGOTO(wrap_gdr_runtime_get_version(&libMajor, &libMinor), res, error); - - // // Query the version of gdrdrv driver - // MSCCLPPCHECKGOTO(wrap_gdr_driver_get_version(handle, &drvMajor, &drvMinor), res, error); - - // // Only support GDRAPI 2.1 and later - // if (libMajor < 2 || (libMajor == 2 && libMinor < 1) || drvMajor < 2 || (drvMajor == 2 && drvMinor < 1)) { - // goto error; - // } - // else - // INFO(MSCCLPP_INIT, "GDRCOPY enabled library %d.%d driver %d.%d", libMajor, libMinor, drvMajor, drvMinor); - // } - return handle; - // error: - // if (handle != NULL) (void) wrap_gdr_close(handle); - // return NULL; -} - -template -mscclppResult_t mscclppGdrCudaCallocDebug(T** ptr, T** devPtr, size_t nelem, void** gdrDesc, const char* filefunc, - int line) -{ - mscclppResult_t result = mscclppSuccess; - cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed; - *ptr = nullptr; - *devPtr = nullptr; - *gdrDesc = nullptr; - CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode)); - - gdr_info_t info; - size_t mapSize; - gdr_mh_t mh; - char* devMem; - void* gdrMap; - ssize_t off; - gdr_mem_desc_t* md; - uint64_t alignedAddr; - size_t align; - - mapSize = sizeof(T) * nelem; - - // GDRCOPY Pinned buffer has to be a minimum of a GPU_PAGE_SIZE - ALIGN_SIZE(mapSize, GPU_PAGE_SIZE); - // GDRCOPY Pinned buffer has to be GPU_PAGE_SIZE aligned too - MSCCLPPCHECKGOTO(mscclppCudaCalloc(&devMem, mapSize + GPU_PAGE_SIZE - 1), result, finish); - alignedAddr = (((uint64_t)devMem) + GPU_PAGE_OFFSET) & GPU_PAGE_MASK; - align = alignedAddr - (uint64_t)devMem; - MSCCLPPCHECKGOTO(wrap_gdr_pin_buffer(mscclppGdrCopy, alignedAddr, mapSize, 0, 0, &mh), result, finish); - - MSCCLPPCHECKGOTO(wrap_gdr_map(mscclppGdrCopy, mh, &gdrMap, mapSize), result, finish); - - MSCCLPPCHECKGOTO(wrap_gdr_get_info(mscclppGdrCopy, mh, &info), result, finish); - - // Will offset ever be non zero ? - off = info.va - alignedAddr; - - MSCCLPPCHECKGOTO(mscclppCalloc(&md, 1), result, finish); - md->gdrDevMem = devMem; - md->gdrMap = gdrMap; - md->gdrMapSize = mapSize; - md->gdrOffset = off + align; - md->gdrMh = mh; - *gdrDesc = md; - - *ptr = (T*)((char*)gdrMap + off); - if (devPtr) - *devPtr = (T*)(devMem + off + align); - - TRACE(mscclpp_INIT, "GDRCOPY : allocated devMem %p gdrMap %p offset %lx mh %lx mapSize %zi at %p", md->gdrDevMem, - md->gdrMap, md->gdrOffset, md->gdrMh.h, md->gdrMapSize, *ptr); - - return mscclppSuccess; - -finish: - CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode)); - if (*ptr == nullptr) - WARN("Failed to CUDA calloc %ld bytes", nelem * sizeof(T)); - INFO(MSCCLPP_ALLOC, "%s:%d Cuda Alloc Size %ld pointer %p", filefunc, line, nelem * sizeof(T), *ptr); - return result; -} -#define mscclppGdrCudaCalloc(...) mscclppGdrCudaCallocDebug(__VA_ARGS__, __FILE__, __LINE__) - -static mscclppResult_t mscclppGdrCudaFree(void* gdrDesc) -{ - gdr_mem_desc_t* md = (gdr_mem_desc_t*)gdrDesc; - MSCCLPPCHECK(wrap_gdr_unmap(mscclppGdrCopy, md->gdrMh, md->gdrMap, md->gdrMapSize)); - MSCCLPPCHECK(wrap_gdr_unpin_buffer(mscclppGdrCopy, md->gdrMh)); - CUDACHECK(cudaFree(md->gdrDevMem)); - free(md); - - return mscclppSuccess; -} - -#endif diff --git a/src/include/registered_ptr.hpp b/src/include/registered_ptr.hpp deleted file mode 100644 index 4f03ea40..00000000 --- a/src/include/registered_ptr.hpp +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef MSCCLPP_REGISTERED_PTR_HPP_ -#define MSCCLPP_REGISTERED_PTR_HPP_ - -namespace mscclpp { - -template class RegisteredPtr -{ - RegisteredMemory memory; - size_t offset; - -public: - RegisteredPtr(RegisteredMemory memory, size_t offset) : memory(memory), offset(offset) - { - } - RegisteredPtr(RegisteredMemory memory) : RegisteredPtr(memory, 0) - { - } - ~RegisteredPtr() - { - } - - RegisteredMemory memory() - { - return memory; - } - - T* data() - { - return reinterpret_cast(memory.data()); - } - - size_t size() - { - return memory.size() / sizeof(T); - } - - size_t offset() - { - return offset; - } - - RegisteredPtr operator+(size_t offset) - { - return RegisteredPtr(memory, this->offset + offset); - } - - // TODO: all other relevant overloads -}; - -} // namespace mscclpp - -#endif // MSCCLPP_REGISTERED_PTR_HPP_ \ No newline at end of file diff --git a/src/init.cc b/src/init.cc deleted file mode 100644 index 03f037c4..00000000 --- a/src/init.cc +++ /dev/null @@ -1,920 +0,0 @@ -#include "alloc.h" -#include "api.h" -#include "bootstrap.h" -#include "checks.h" -#include "config.h" -#if defined(MSCCLPP_USE_GDRCOPY) -#include "gdr.h" -#endif -#include "infiniband/verbs.h" -#include "mscclpp.h" -#include -#include -#include -#if defined(ENABLE_NPKIT) -#include "npkit/npkit.h" -#endif - -static uint64_t hashUniqueId(mscclppUniqueId const& id) -{ - char const* bytes = (char const*)&id; - uint64_t h = 0xdeadbeef; - for (int i = 0; i < (int)sizeof(mscclppUniqueId); i++) { - h ^= h >> 32; - h *= 0x8db3db47fa2994ad; - h += bytes[i]; - } - return h; -} - -pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; -static bool initialized = false; -// static size_t maxLocalSizeBytes = 0; - -#if defined(MSCCLPP_USE_GDRCOPY) - -gdr_t mscclppGdrCopy = NULL; - -mscclppResult_t initGdrCopy() -{ - if (mscclppGdrCopy == NULL) { - mscclppGdrCopy = mscclppGdrInit(); - if (mscclppGdrCopy == NULL) { - WARN("GDR init failed"); - return mscclppSystemError; - } - } - return mscclppSuccess; -} - -#endif - -static mscclppResult_t mscclppInit() -{ - if (__atomic_load_n(&initialized, __ATOMIC_ACQUIRE)) - return mscclppSuccess; - pthread_mutex_lock(&initLock); - if (!initialized) { - // Always initialize bootstrap network - MSCCLPPCHECK(bootstrapNetInit()); - - __atomic_store_n(&initialized, true, __ATOMIC_RELEASE); - } - pthread_mutex_unlock(&initLock); - return mscclppSuccess; -} - -static std::string mscclppShmFileName(mscclppComm_t comm, int rank) -{ - std::stringstream ss; - ss << "mscclpp." << std::hex << comm->magic << "." << rank; - return ss.str(); -} - -MSCCLPP_API mscclppResult_t mscclppGetUniqueId(mscclppUniqueId* out) -{ - MSCCLPPCHECK(mscclppInit()); - // mscclppCHECK(PtrCheck(out, "GetUniqueId", "out")); - mscclppResult_t res = bootstrapGetUniqueId((struct mscclppBootstrapHandle*)out); - TRACE_CALL("mscclppGetUniqueId(0x%llx)", (unsigned long long)hashUniqueId(*out)); - return res; -} - -MSCCLPP_API mscclppResult_t mscclppBootstrapAllGather(mscclppComm_t comm, void* data, int size) -{ - MSCCLPPCHECK(bootstrapAllGather(comm->bootstrap, data, size)); - return mscclppSuccess; -} - -MSCCLPP_API mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char* ipPortPair, int rank) -{ -#if defined(MSCCLPP_USE_GDRCOPY) - MSCCLPPCHECK(initGdrCopy()); -#endif - - mscclppResult_t res = mscclppSuccess; - mscclppComm_t _comm = NULL; - // uint64_t hash = getHostHash(); - // uint64_t *hashes; - // std::map hashToNode; - - MSCCLPPCHECKGOTO(mscclppCalloc(&_comm, 1), res, fail); - _comm->rank = rank; - _comm->nRanks = nranks; - _comm->devNumaNode = -1; - // We assume that the user has set the device to the intended one already - CUDACHECK(cudaGetDevice(&_comm->cudaDev)); - - MSCCLPPCHECK(bootstrapNetInit(ipPortPair)); - mscclppBootstrapHandle handle; - MSCCLPPCHECK(bootstrapGetUniqueId(&handle, rank == 0, ipPortPair)); - _comm->magic = handle.magic; - - MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t**)&_comm->abortFlag, 1), res, fail); - MSCCLPPCHECK(bootstrapInit(&handle, _comm)); - -#if defined(ENABLE_NPKIT) - // Init NPKit - MSCCLPPCHECK(NpKit::Init(_comm->rank)); -#endif - - *comm = _comm; - return res; -fail: - if (_comm) { - if (_comm->abortFlag) - mscclppCudaHostFree((void*)_comm->abortFlag); - free(_comm); - } - if (comm) - *comm = NULL; - return res; -} - -MSCCLPP_API mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank) -{ -#if defined(MSCCLPP_USE_GDRCOPY) - MSCCLPPCHECK(initGdrCopy()); -#endif - - mscclppResult_t res = mscclppSuccess; - mscclppComm_t _comm = NULL; - mscclppBootstrapHandle* handle = (mscclppBootstrapHandle*)&id; - - MSCCLPPCHECKGOTO(mscclppCalloc(&_comm, 1), res, fail); - _comm->rank = rank; - _comm->nRanks = nranks; - // We assume that the user has set the device to the intended one already - CUDACHECK(cudaGetDevice(&_comm->cudaDev)); - - MSCCLPPCHECK(bootstrapNetInit()); - _comm->magic = handle->magic; - - MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t**)&_comm->abortFlag, 1), res, fail); - MSCCLPPCHECK(bootstrapInit(handle, _comm)); - -#if defined(ENABLE_NPKIT) - // Init NPKit - MSCCLPPCHECK(NpKit::Init(_comm->rank)); -#endif - - *comm = _comm; - return res; -fail: - if (_comm) { - if (_comm->abortFlag) - mscclppCudaHostFree((void*)_comm->abortFlag); - free(_comm); - } - if (comm) - *comm = NULL; - return res; -} - -MSCCLPP_API mscclppResult_t mscclppCommDestroy(mscclppComm_t comm) -{ -#if defined(ENABLE_NPKIT) - const char* npkitDumpDir = nullptr; -#endif - - if (comm == NULL) - return mscclppSuccess; - - for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { - struct mscclppProxyState* proxyState = comm->proxyState[i]; - if (proxyState) { - MSCCLPPCHECK(proxyState->fifo.destroy()); - if (proxyState->p2pStream) - CUDACHECK(cudaStreamDestroy(proxyState->p2pStream)); - free(proxyState); - } - } - - for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { - if (comm->ibContext[i]) { - comm->ibContext[i].reset(nullptr); - } - } - - for (int i = 0; i < comm->nConns; i++) { - struct mscclppConn* conn = &comm->conns[i]; - if (conn) { - MSCCLPPCHECK(mscclppCudaFree(conn->devConn->localSignalEpochId)); - MSCCLPPCHECK(mscclppCudaFree(conn->devConn->waitEpochId)); - if (conn->hostConn) - delete conn->hostConn; - } - } - - if (comm->bootstrap) - MSCCLPPCHECK(bootstrapClose(comm->bootstrap)); - - mscclppCudaHostFree((void*)comm->abortFlag); - free(comm); - -#if defined(ENABLE_NPKIT) - // Dump NPKit events and shutdown - npkitDumpDir = getenv("NPKIT_DUMP_DIR"); - if (npkitDumpDir == nullptr) { - WARN("NPKIT_DUMP_DIR is empty"); - } else { - MSCCLPPCHECK(NpKit::Dump(npkitDumpDir)); - } - MSCCLPPCHECK(NpKit::Shutdown()); -#endif - - return mscclppSuccess; -} - -MSCCLPP_API const char* mscclppGetErrorString(mscclppResult_t code) -{ - switch (code) { - case mscclppSuccess: - return "no error"; - case mscclppUnhandledCudaError: - return "unhandled cuda error"; - case mscclppSystemError: - return "unhandled system error"; - case mscclppInternalError: - return "internal error"; - case mscclppInvalidArgument: - return "invalid argument"; - case mscclppInvalidUsage: - return "invalid usage"; - case mscclppRemoteError: - return "remote process exited or there was a network error"; - case mscclppInProgress: - return "MSCCL++ operation in progress"; - default: - return "unknown result code"; - } -} - -MSCCLPP_API mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag, - mscclppDevConn_t** devConn) -{ - for (int i = 0; i < comm->nConns; i++) { - if (comm->devConns[i].remoteRank == remoteRank && comm->devConns[i].tag == tag) { - *devConn = &comm->devConns[i]; - return mscclppSuccess; - } - } - - return mscclppInvalidArgument; -} - -MSCCLPP_API mscclppResult_t mscclppGetAllDeviceConnections(mscclppComm_t comm, mscclppDevConn_t** devConns, int* nConns) -{ - *nConns = comm->nConns; - *devConns = comm->devConns; - return mscclppSuccess; -} - -#if defined(ENABLE_NPKIT) - -static void npkitInitReqIds(struct mscclppComm* comm) -{ - for (int i = 0; i < comm->nConns; i++) { - struct mscclppConn* conn = &comm->conns[i]; - conn->npkitUsedReqIds.resize(0); - conn->npkitFreeReqIds.resize(MSCCLPP_IB_MAX_SENDS); - for (uint64_t j = 0; j < MSCCLPP_IB_MAX_SENDS; j++) { - conn->npkitFreeReqIds[j] = MSCCLPP_IB_MAX_SENDS - j - 1; - } - } -} - -static void npkitCollectEntryEvent(struct mscclppConn* conn, uint8_t type, uint32_t size) -{ - uint64_t reqId = 0; - if (conn->npkitFreeReqIds.size() == 0) { - reqId = conn->npkitUsedReqIds.size(); - } else { - reqId = conn->npkitFreeReqIds.back(); - conn->npkitFreeReqIds.pop_back(); - } - conn->npkitUsedReqIds.push_back(reqId); - NpKit::CollectCpuEvent(type, size, (uint32_t)reqId, NpKit::GetCpuTimestamp(), conn->connId); -} - -static void npkitCollectExitEvents(struct mscclppConn* conn, uint8_t type) -{ - while (conn->npkitUsedReqIds.size()) { - uint64_t reqId = conn->npkitUsedReqIds.back(); - NpKit::CollectCpuEvent(type, 0, (uint32_t)reqId, NpKit::GetCpuTimestamp(), conn->connId); - conn->npkitFreeReqIds.push_back(reqId); - conn->npkitUsedReqIds.pop_back(); - } -} - -#else - -#define npkitInitReqIds(comm) - -#define npkitCollectEntryEvent(conn, type, size) - -#define npkitCollectExitEvents(conn, type) - -#endif - -struct mscclppHostP2PConn : mscclppHostConn -{ - mscclppHostP2PConn(mscclppConn* _conn, cudaStream_t _stream) : conn(_conn), p2pStream(_stream) - { - } - - void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) - { - put(1, dstDataOffset, 1, srcDataOffset, dataSize); - } - void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset, - uint64_t dataSize) - { - void* srcBuff = (void*)((char*)conn->bufferRegistrations[src].data + srcDataOffset); - void* dstBuff = (void*)((char*)conn->remoteBufferRegistrations[dst].data + dstDataOffset); - CUDACHECKNORET(cudaMemcpyAsync(dstBuff, srcBuff, dataSize, cudaMemcpyDeviceToDevice, p2pStream)); - npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)dataSize); - } - void signal() - { - CUDACHECKNORET(cudaMemcpyAsync(&conn->devConn->remoteSignalEpochId->proxy, - &(conn->devConn->localSignalEpochId->device), sizeof(uint64_t), - cudaMemcpyDeviceToDevice, p2pStream)); - npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t)); - } - void wait() - { - } - void flush() - { - CUDACHECKNORET(cudaStreamSynchronize(p2pStream)); - npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT); - } - - mscclppConn* conn; - cudaStream_t p2pStream; -}; - -struct mscclppHostIBConn : mscclppHostConn -{ - mscclppHostIBConn(mscclppConn* conn) : conn(conn) - { - this->ibQp = NULL; - } - - void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) - { - put(1, dstDataOffset, 1, srcDataOffset, dataSize); - } - void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset, - uint64_t dataSize) - { - this->ibQp->stageSend(this->ibMrs[src], this->remoteIbMrInfos[dst], (uint32_t)dataSize, - /*wrId=*/0, /*srcOffset=*/srcDataOffset, /*dstOffset=*/dstDataOffset, /*signaled=*/false); - this->ibQp->postSend(); - npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)dataSize); - } - void signal() - { - // My local device flag is copied to the remote's proxy flag - this->ibQp->stageSend(this->ibMrs[0], this->remoteIbMrInfos[0], sizeof(uint64_t), - /*wrId=*/0, /*srcOffset=*/0, /*dstOffset=*/sizeof(uint64_t), /*signaled=*/true); - this->ibQp->postSend(); - npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t)); - } - void wait() - { - } - void flush() - { - bool isWaiting = true; - while (isWaiting) { - int wcNum = this->ibQp->pollCq(); - if (wcNum < 0) { - WARN("pollCq failed: errno %d", errno); - continue; - } - for (int i = 0; i < wcNum; ++i) { - struct ibv_wc* wc = (struct ibv_wc*)this->ibQp->getWc(i); - if (wc->status != IBV_WC_SUCCESS) { - WARN("wc status %d", wc->status); - continue; - } - if (wc->opcode == IBV_WC_RDMA_WRITE) { - isWaiting = false; - break; - } - } - } - npkitCollectExitEvents(conn, NPKIT_EVENT_IB_SEND_EXIT); - } - - mscclppConn* conn; - mscclpp::IbQp* ibQp; - std::vector ibMrs; - std::vector remoteIbMrInfos; -}; - -MSCCLPP_API mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int remoteRank, int tag, - mscclppTransport_t transportType, const char* ibDev) -{ - // save this processes numa binding and set it to the one closest to the device - // so that all the allocation are close to the device - if (comm->devNumaNode == -1) { - // in case this is our first time - MSCCLPPCHECK(getDeviceNumaNode(comm->cudaDev, &comm->devNumaNode)); - INFO(MSCCLPP_INIT, "NUMA node of device %d is set to %d", comm->cudaDev, comm->devNumaNode); - } - // save numa node bitmask to change it back to user's numa node - mscclppNumaState curProcessState; - MSCCLPPCHECK(getNumaState(&curProcessState)); - // change to device's numa node so that the following allocation are close to the device - MSCCLPPCHECK(numaBind(comm->devNumaNode)); - - if (comm->nConns == MAXCONNECTIONS) { - WARN("Too many connections made"); - return mscclppInternalError; - } - int connId = comm->nConns; - struct mscclppConn* conn = &comm->conns[connId]; - conn->connId = connId; - conn->transport = transportType; - conn->buffSize = 0; - - conn->ibCtx = NULL; - int ibDevIdx = -1; - if (transportType == mscclppTransportIB) { - // Check if an IB context exists - int firstNullIdx = -1; - for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { - if (comm->ibContext[i] == NULL) { - if (firstNullIdx == -1) { - firstNullIdx = i; - } - } else if (strncmp(comm->ibContext[i]->getDevName().c_str(), ibDev, IBV_SYSFS_NAME_MAX) == 0) { - ibDevIdx = i; - break; - } - } - - // If not, create a new one - if (ibDevIdx == -1) { - // Create a new context. - ibDevIdx = firstNullIdx; - comm->ibContext[ibDevIdx].reset(new mscclpp::IbCtx(std::string(ibDev))); - } - // Set the ib context for this conn - conn->ibCtx = comm->ibContext[ibDevIdx].get(); - - } else if (transportType == mscclppTransportP2P) { - // do the rest of the initialization later - } else if (transportType == mscclppTransportSHM) { - WARN("Shared memory interconnection is not implemented yet!"); - return mscclppInternalError; - } else { - WARN("Unexpected connection type!"); - return mscclppInvalidUsage; - } - - // Find/create a proxy state for the given connection - struct mscclppProxyState* proxyState = NULL; - // First see if there is a matching context - // If not, find the first empty proxy - int firstEmptyProxyIndex = -1; - for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { - struct mscclppProxyState* curProxy = comm->proxyState[i]; - if (curProxy && (curProxy->transportType == transportType)) { - if ((transportType == mscclppTransportIB && curProxy->ibContext == conn->ibCtx) || - (transportType == mscclppTransportP2P)) { - proxyState = curProxy; - break; // we found the matching context - } - } - if (curProxy == NULL && firstEmptyProxyIndex == -1) { - firstEmptyProxyIndex = i; - } - } - - if (proxyState == NULL && firstEmptyProxyIndex == -1) { - WARN("Too many proxies have been allocated!"); - return mscclppInvalidUsage; - } - - // If we couldn't find a matching context, create one - if (proxyState == NULL) { - MSCCLPPCHECK(mscclppCalloc(&proxyState, 1)); - MSCCLPPCHECK(proxyState->fifo.create()); - - if (transportType == mscclppTransportIB) { - proxyState->ibContext = conn->ibCtx; - proxyState->p2pStream = NULL; - } else if (transportType == mscclppTransportP2P) { - proxyState->ibContext = NULL; - CUDACHECK(cudaStreamCreateWithFlags(&proxyState->p2pStream, cudaStreamNonBlocking)); - } - proxyState->numaNodeToBind = comm->devNumaNode; - - // INFO(MSCCLPP_INIT, "NUMA node for device %d is %d", cudaDev, *numaNode); - proxyState->transportType = transportType; - comm->proxyState[firstEmptyProxyIndex] = proxyState; - } - if (proxyState == NULL) { - // Cannot reach - WARN("Proxy allocation failed!"); - return mscclppInternalError; - } - - if (transportType == mscclppTransportIB) { - conn->hostConn = new mscclppHostIBConn(conn); - } else if (transportType == mscclppTransportP2P) { - conn->hostConn = new mscclppHostP2PConn(conn, proxyState->p2pStream); - } - - struct mscclppDevConn* devConn = &comm->devConns[connId]; - - conn->devConn = devConn; - conn->devConn->localBuff = nullptr; - MSCCLPPCHECK(mscclppCudaCalloc(&conn->devConn->localSignalEpochId, 1)); - MSCCLPPCHECK(mscclppCudaCalloc(&conn->devConn->waitEpochId, 1)); - conn->devConn->remoteRank = remoteRank; - conn->devConn->tag = tag; - conn->devConn->fifo.connId = connId; -#if defined(MSCCLPP_USE_GDRCOPY) - conn->devConn->fifo.triggerFifo = proxyState->fifo.triggerFifoDev; -#else - conn->devConn->fifo.triggerFifo = proxyState->fifo.triggerFifo; -#endif - conn->devConn->fifo.triggerFifoHead = proxyState->fifo.fifoHead; - conn->devConn->fifo.triggerFifoTail = proxyState->fifo.fifoTailDev; - - comm->nConns++; - - // change the numa binding back to user's - MSCCLPPCHECK(setNumaState(curProcessState)); - - mscclppBufferHandle_t signalHandle = -1; - MSCCLPPCHECK(mscclppRegisterBufferForConnection(comm, connId, conn->devConn->localSignalEpochId, - sizeof(mscclppDevConnSignalEpochId), &signalHandle)); - if (signalHandle != 0) { - WARN("signal handle should be 0"); - return mscclppInternalError; - } - - return mscclppSuccess; -} - -MSCCLPP_API mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff, - uint64_t buffSize, mscclppTransport_t transportType, const char* ibDev) -{ - int connId = comm->nConns; - MSCCLPPCHECK(mscclppConnectWithoutBuffer(comm, remoteRank, tag, transportType, ibDev)); - struct mscclppConn* conn = &comm->conns[connId]; - - conn->buffSize = buffSize; - conn->devConn->localBuff = localBuff; - - mscclppBufferHandle_t localBuffHandle = -1; - MSCCLPPCHECK(mscclppRegisterBufferForConnection(comm, connId, localBuff, buffSize, &localBuffHandle)); - if (localBuffHandle != 1) { - WARN("data buffer handle should be 1"); - return mscclppInternalError; - } - - return mscclppSuccess; -} - -MSCCLPP_API mscclppResult_t mscclppRegisterBufferForConnection(mscclppComm_t comm, int connIdx, void* localBuff, - uint64_t buffSize, mscclppBufferHandle_t* handle) -{ - if (connIdx >= comm->nConns) { - WARN("connIdx out of range"); - return mscclppInvalidArgument; - } - mscclppConn& conn = comm->conns[connIdx]; - *handle = conn.bufferRegistrations.size(); - conn.bufferRegistrations.emplace_back(); - conn.bufferRegistrations.back().data = localBuff; - conn.bufferRegistrations.back().size = buffSize; - - return mscclppSuccess; -} - -struct mscclppBufferRegistrationInfo -{ - cudaIpcMemHandle_t cudaHandle; - mscclpp::IbMrInfo ibMrInfo; - uint64_t size; -}; - -struct connInfo -{ - mscclpp::IbQpInfo infoQp; - std::vector bufferInfos; - - struct header - { - mscclpp::IbQpInfo infoQp; - int numBufferInfos; - }; - - mscclppResult_t sendOverBootstrap(void* bootstrap, int remoteRank, int tag) - { - header h; - h.infoQp = infoQp; - h.numBufferInfos = bufferInfos.size(); - MSCCLPPCHECK(bootstrapSend(bootstrap, remoteRank, tag, &h, sizeof(header))); - MSCCLPPCHECK(bootstrapSend(bootstrap, remoteRank, tag, bufferInfos.data(), - bufferInfos.size() * sizeof(mscclppBufferRegistrationInfo))); - return mscclppSuccess; - } - - mscclppResult_t recvOverBootstrap(void* bootstrap, int remoteRank, int tag) - { - header h; - MSCCLPPCHECK(bootstrapRecv(bootstrap, remoteRank, tag, &h, sizeof(header))); - infoQp = h.infoQp; - bufferInfos.resize(h.numBufferInfos); - MSCCLPPCHECK(bootstrapRecv(bootstrap, remoteRank, tag, bufferInfos.data(), - bufferInfos.size() * sizeof(mscclppBufferRegistrationInfo))); - return mscclppSuccess; - } -}; - -mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*input*/) -{ - if (conn == NULL) { - WARN("connection cannot be null"); - return mscclppInternalError; - } - - // Add all registered buffers - for (const auto& bufReg : conn->bufferRegistrations) { - connInfo->bufferInfos.emplace_back(); - CUDACHECK(cudaIpcGetMemHandle(&connInfo->bufferInfos.back().cudaHandle, bufReg.data)); - connInfo->bufferInfos.back().size = bufReg.size; - } - return mscclppSuccess; -} - -mscclppResult_t mscclppP2pConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/) -{ - if (connInfo == NULL || conn == NULL) { - WARN("ipcHandles or connection cannot be null"); - return mscclppInternalError; - } - if (connInfo->bufferInfos.size() < 1) { - WARN("at least 1 buffer info expected"); - return mscclppInternalError; - } - - // Open all remote registered buffers - for (size_t i = 0; i < connInfo->bufferInfos.size(); i++) { - mscclppBufferRegistration newBufReg; - CUDACHECK( - cudaIpcOpenMemHandle(&newBufReg.data, connInfo->bufferInfos[i].cudaHandle, cudaIpcMemLazyEnablePeerAccess)); - newBufReg.size = connInfo->bufferInfos[i].size; - conn->remoteBufferRegistrations.push_back(newBufReg); - } - - if (conn->remoteBufferRegistrations[0].size != sizeof(mscclppDevConnSignalEpochId)) { - WARN("buffer registration zero size doesn't match sizeof(mscclppDevConnSignalEpochId)"); - return mscclppInternalError; - } - conn->devConn->remoteSignalEpochId = (mscclppDevConnSignalEpochId*)conn->remoteBufferRegistrations[0].data; - - // For backwards compatibility with the previous API that assumed one data buffer per connection, set the remote - // buffer to the first remote data buffer - if (conn->remoteBufferRegistrations.size() > 1) { - conn->devConn->remoteBuff = conn->remoteBufferRegistrations[1].data; - } - return mscclppSuccess; -} - -mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/) -{ - if (connInfo == NULL || conn == NULL) { - WARN("connInfo or connection cannot be null"); - return mscclppInternalError; - } - struct mscclppDevConn* devConn = conn->devConn; - struct mscclppHostIBConn* hostConn = (struct mscclppHostIBConn*)conn->hostConn; - devConn->remoteBuff = NULL; - devConn->remoteSignalEpochId = NULL; - - mscclpp::IbCtx* ibCtx = conn->ibCtx; - if (hostConn->ibQp == NULL) { - hostConn->ibQp = ibCtx->createQp(); - } - - // Add all registered buffers - for (const auto& bufReg : conn->bufferRegistrations) { - hostConn->ibMrs.emplace_back(ibCtx->registerMr(bufReg.data, sizeof(struct mscclppDevConnSignalEpochId))); - connInfo->bufferInfos.emplace_back(); - connInfo->bufferInfos.back().ibMrInfo = hostConn->ibMrs.back()->getInfo(); - connInfo->bufferInfos.back().size = bufReg.size; - } - - connInfo->infoQp = hostConn->ibQp->getInfo(); - return mscclppSuccess; -} - -mscclppResult_t mscclppIbConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/) -{ - if (connInfo == NULL || conn == NULL) { - WARN("ipcHandles or connection cannot be null"); - return mscclppInternalError; - } - struct mscclppHostIBConn* hostConn = (struct mscclppHostIBConn*)conn->hostConn; - hostConn->ibQp->rtr(connInfo->infoQp); - hostConn->ibQp->rts(); - - // No remote pointers to set with IB, so we just set the Mrs - - // Push the Mrs for all the remote registered buffers - for (size_t i = 1; i < connInfo->bufferInfos.size(); i++) { - hostConn->remoteIbMrInfos.push_back(connInfo->bufferInfos[i].ibMrInfo); - - mscclppBufferRegistration newBufReg; - newBufReg.data = nullptr; - newBufReg.size = connInfo->bufferInfos[i].size; - conn->remoteBufferRegistrations.push_back(newBufReg); - } - return mscclppSuccess; -} - -MSCCLPP_API mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm) -{ - // Send info to peers - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn* conn = &comm->conns[i]; - - struct connInfo cInfo; - if (conn->transport == mscclppTransportP2P) { - MSCCLPPCHECK(mscclppP2pConnectionSetupStart(&cInfo, conn)); - } else if (conn->transport == mscclppTransportIB) { - MSCCLPPCHECK(mscclppIbConnectionSetupStart(&cInfo, conn)); - } - // TODO: from saemal: do we possibly deadlock if there are too many outstanding sends? - // MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &cInfo, - // sizeof(cInfo))); - MSCCLPPCHECK(cInfo.sendOverBootstrap(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag)); - } - - // Recv info from peers - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn* conn = &comm->conns[i]; - struct connInfo cInfo; - MSCCLPPCHECK(cInfo.recvOverBootstrap(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag)); - if (conn->transport == mscclppTransportP2P) { - MSCCLPPCHECK(mscclppP2pConnectionSetupEnd(&cInfo, conn)); - } else if (conn->transport == mscclppTransportIB) { - MSCCLPPCHECK(mscclppIbConnectionSetupEnd(&cInfo, conn)); - } - } - - // a barrier to ensure setup on all gpus are done and we can return to the user - MSCCLPPCHECK(mscclppBootstrapBarrier(comm)); - return mscclppSuccess; -} - -struct bufferInfo -{ - cudaIpcMemHandle_t handleBuff; - mscclpp::IbMrInfo infoBuffMr; -}; - -MSCCLPP_API mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* local_memory, size_t size, - mscclppRegisteredMemory* regMem) -{ - std::vector ibMrs; - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn* conn = &comm->conns[i]; - struct bufferInfo bInfo; - const mscclpp::IbMr* ibBuffMr; - - // TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB - if (conn->transport == mscclppTransportP2P) { - CUDACHECK(cudaIpcGetMemHandle(&bInfo.handleBuff, local_memory)); - } else if (conn->transport == mscclppTransportIB) { - ibBuffMr = conn->ibCtx->registerMr(local_memory, size); - bInfo.infoBuffMr = ibBuffMr->getInfo(); - ibMrs.emplace_back(ibBuffMr); - } - - MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &bInfo, sizeof(bInfo))); - } - - // Recv info from peers - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn* conn = &comm->conns[i]; - struct bufferInfo bInfo; - - mscclppRegisteredMemoryP2P p2p; - p2p.IbMr = NULL; - p2p.remoteBuff = NULL; - MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &bInfo, sizeof(bInfo))); - - // TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB - if (conn->transport == mscclppTransportP2P) { - CUDACHECK(cudaIpcOpenMemHandle((void**)&p2p.remoteBuff, bInfo.handleBuff, cudaIpcMemLazyEnablePeerAccess)); - } else if (conn->transport == mscclppTransportIB) { - p2p.IbMr = ibMrs[i]; - } - regMem->p2p.push_back(p2p); - } - return mscclppSuccess; -} - -MSCCLPP_API mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem, - void* srcBuff, size_t size, uint32_t srcOffset, - uint32_t dstOffset, int64_t stream) -{ - int ret = 0; - // TODO: transport should be an argument too so user can decide which transport to use - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn* conn = &comm->conns[i]; - // TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB - if (conn->transport == mscclppTransportP2P) { - void* dstBuff = regMem->p2p[i].remoteBuff; - CUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, size, cudaMemcpyDeviceToDevice, (cudaStream_t)stream)); - } else { - WARN("mscclppRegisteredBufferWrite not implemented for IB"); - return mscclppInternalError; - // TODO: fix the following (Olli: probably by including the relevant ibBuffMr in the mscclppRegisteredMemory) - // struct mscclppHostIBConn* hostConn = (struct mscclppHostIBConn*)conn->hostConn; - // hostConn->ibQp->stageSend(hostConn->ibBuffMr, &hostConn->ibBuffMrRemoteInfo, (uint32_t)size, - // /*wrId=*/0, /*srcOffset=*/srcOffset, /*dstOffset=*/dstOffset, /*signaled=*/false); - // if ((ret = hostConn->ibQp->postSend()) != 0) { - // // Return value is errno. - // WARN("data postSend failed: errno %d", ret); - // } - // // ?? - // // npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_ENTRY, (uint32_t)trigger.fields.dataSize, - // // trigger.fields.connId); - } - } - return mscclppSuccess; -} - -// TODO: destroy registered buffer - -MSCCLPP_API mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm) -{ - npkitInitReqIds(comm); - MSCCLPPCHECK(mscclppProxyCreate(comm)); - return mscclppSuccess; -} - -MSCCLPP_API mscclppResult_t mscclppBootstrapBarrier(mscclppComm_t comm) -{ - int* tmp = new int[comm->nRanks]; - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - delete[] tmp; - return mscclppSuccess; -} - -MSCCLPP_API mscclppResult_t mscclppProxyStop(mscclppComm_t comm) -{ - // a barrier to make sure all ranks are done with their work before stopping the proxy - MSCCLPPCHECK(mscclppBootstrapBarrier(comm)); - - MSCCLPPCHECK(mscclppProxyDestroy(comm)); - return mscclppSuccess; -} - -MSCCLPP_API mscclppResult_t mscclppCommRank(mscclppComm_t comm, int* rank) -{ - if (comm == NULL || rank == NULL) { - WARN("comm or rank cannot be null"); - return mscclppInvalidUsage; - } - *rank = comm->rank; - return mscclppSuccess; -} - -MSCCLPP_API mscclppResult_t mscclppCommSize(mscclppComm_t comm, int* size) -{ - if (comm == NULL || size == NULL) { - WARN("comm or size cannot be null"); - return mscclppInvalidUsage; - } - *size = comm->nRanks; - return mscclppSuccess; -} - -MSCCLPP_API void mscclppDefaultLogHandler(const char* msg) -{ - mscclppDebugDefaultLogHandler(msg); -} - -MSCCLPP_API mscclppResult_t mscclppSetLogHandler(mscclppLogHandler_t handler) -{ - return mscclppDebugSetLogHandler(handler); -} - -MSCCLPP_API mscclppResult_t mscclppSetBootstrapConnTimeout(int timeout) -{ - mscclppConfig* config = mscclppConfig::getInstance(); - config->setBootstrapConnectionTimeoutConfig(timeout); - return mscclppSuccess; -} diff --git a/src/misc/npkit.cc b/src/npkit/npkit.cc similarity index 99% rename from src/misc/npkit.cc rename to src/npkit/npkit.cc index 30914810..e7fe78f8 100644 --- a/src/misc/npkit.cc +++ b/src/npkit/npkit.cc @@ -3,7 +3,7 @@ #include #include "alloc.h" -#include "npkit/npkit.h" +#include "npkit.h" #include uint64_t NpKit::rank_ = 0; diff --git a/src/include/npkit/npkit.h b/src/npkit/npkit.h similarity index 96% rename from src/include/npkit/npkit.h rename to src/npkit/npkit.h index f0a72dfc..c0cc4710 100644 --- a/src/include/npkit/npkit.h +++ b/src/npkit/npkit.h @@ -3,8 +3,8 @@ #include -#include "npkit/npkit_event.h" -#include "npkit/npkit_struct.h" +#include "npkit_event.h" +#include "npkit_struct.h" class NpKit { diff --git a/src/include/npkit/npkit_event.h b/src/npkit/npkit_event.h similarity index 100% rename from src/include/npkit/npkit_event.h rename to src/npkit/npkit_event.h diff --git a/src/include/npkit/npkit_struct.h b/src/npkit/npkit_struct.h similarity index 100% rename from src/include/npkit/npkit_struct.h rename to src/npkit/npkit_struct.h diff --git a/src/proxy.cc b/src/proxy.cc index c8bf4414..060bbfb0 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1,213 +1,112 @@ -#include "alloc.h" -#include "checks.h" -#include "comm.h" -#include "debug.h" -#include "ib.hpp" -#include "socket.h" - -#include -#include -#include +#include "api.h" +#include +#include +#include "utils.h" +#include "utils.hpp" +#include #include -#define MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD 100 +namespace mscclpp { -#define PROXYCUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - WARN("CUDA error from proxy: %s", cudaGetErrorString(err)); \ - return NULL; \ - } \ - } while (false) +const int ProxyStopCheckPeriod = 1000; -#define PROXYMSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - if (mscclppDebugNoWarn == 0) \ - INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \ - return NULL; \ - } \ - } while (0); +const int ProxyFlushPeriod = 4; -struct proxyArgs +struct Proxy::Impl { - struct mscclppComm* comm; - struct mscclppProxyState* proxyState; + ProxyHandler handler; + std::function threadInit; + HostProxyFifo fifo; + std::thread service; + std::atomic_bool running; + + Impl(ProxyHandler handler, std::function threadInit) + : handler(handler), threadInit(threadInit), running(false) + { + } }; -mscclppResult_t mscclppProxyFifo::create() +MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler, std::function threadInit) { - MSCCLPPCHECK(mscclppCudaCalloc(&this->fifoHead, 1)); -#if defined(MSCCLPP_USE_GDRCOPY) - MSCCLPPCHECK( - mscclppGdrCudaCalloc(&this->triggerFifo, &this->triggerFifoDev, MSCCLPP_PROXY_FIFO_SIZE, &this->triggerFifoDesc)); - MSCCLPPCHECK(mscclppGdrCudaCalloc(&this->fifoTailDevHostPtr, &this->fifoTailDev, 1, &this->fifoTailDesc)); -#else - MSCCLPPCHECK(mscclppCudaHostCalloc(&this->triggerFifo, MSCCLPP_PROXY_FIFO_SIZE)); - MSCCLPPCHECK(mscclppCudaCalloc(&this->fifoTailDev, 1)); -#endif - CUDACHECK(cudaStreamCreateWithFlags(&this->stream, cudaStreamNonBlocking)); - this->fifoTailHost = 0; - return mscclppSuccess; + pimpl = std::make_unique(handler, threadInit); } -mscclppResult_t mscclppProxyFifo::destroy() +MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler) : Proxy(handler, [] {}) { - MSCCLPPCHECK(mscclppCudaFree(this->fifoHead)); -#if defined(MSCCLPP_USE_GDRCOPY) - MSCCLPPCHECK(mscclppGdrCudaFree(this->triggerFifoDesc)); - MSCCLPPCHECK(mscclppGdrCudaFree(this->fifoTailDesc)); -#else - MSCCLPPCHECK(mscclppCudaHostFree(this->triggerFifo)); - MSCCLPPCHECK(mscclppCudaFree(this->fifoTailDev)); -#endif - CUDACHECK(cudaStreamDestroy(this->stream)); - return mscclppSuccess; } -// return true if the trigger is valid -mscclppResult_t mscclppProxyFifo::poll(mscclppTrigger* trigger) +MSCCLPP_API_CPP Proxy::~Proxy() { - __m128i xmm0 = _mm_load_si128((__m128i*)&this->triggerFifo[this->fifoTailHost % MSCCLPP_PROXY_FIFO_SIZE]); - _mm_store_si128((__m128i*)trigger, xmm0); - return mscclppSuccess; -} - -mscclppResult_t mscclppProxyFifo::pop() -{ - *(volatile uint64_t*)(&this->triggerFifo[this->fifoTailHost % MSCCLPP_PROXY_FIFO_SIZE]) = 0; - (this->fifoTailHost)++; - return mscclppSuccess; -} - -mscclppResult_t mscclppProxyFifo::flushTail(bool sync) -{ - // Flush the tail to device memory. This is either triggered every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER to make sure - // that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush - // request. -#if defined(MSCCLPP_USE_GDRCOPY) - *(volatile uint64_t*)(this->fifoTailDevHostPtr) = this->fifoTailHost; -#else - CUDACHECK( - cudaMemcpyAsync(this->fifoTailDev, &(this->fifoTailHost), sizeof(uint64_t), cudaMemcpyHostToDevice, this->stream)); - if (sync) { - CUDACHECK(cudaStreamSynchronize(this->stream)); - } -#endif - return mscclppSuccess; -} - -static void processTrigger(const mscclppTrigger trigger, mscclppConn* conn) -{ - // Iterate over what send is needed - if (trigger.fields.type & mscclppData) { - conn->hostConn->put(trigger.fields.dstDataOffset, trigger.fields.srcDataOffset, trigger.fields.dataSize); - } - - if (trigger.fields.type & mscclppFlag) { - conn->hostConn->signal(); - } - - // Wait for completion - if (trigger.fields.type & mscclppSync) { - conn->hostConn->flush(); + if (pimpl) { + stop(); } } -void* mscclppProxyService(void* _args) +MSCCLPP_API_CPP void Proxy::start() { - struct proxyArgs* args = (struct proxyArgs*)_args; - struct mscclppComm* comm = args->comm; - struct mscclppProxyState* proxyState = args->proxyState; - free(_args); // allocated in mscclppProxyCreate + pimpl->running = true; + pimpl->service = std::thread([this] { + pimpl->threadInit(); - // from this point on, proxy thread will stay close to the device - PROXYMSCCLPPCHECK(numaBind(comm->devNumaNode)); + ProxyHandler handler = this->pimpl->handler; + HostProxyFifo& fifo = this->pimpl->fifo; + std::atomic_bool& running = this->pimpl->running; + ProxyTrigger trigger; - struct mscclppProxyFifo* fifo = &proxyState->fifo; - volatile mscclppProxyRunState_t* run = &proxyState->run; - mscclppTrigger trigger; + int runCnt = ProxyStopCheckPeriod; + uint64_t flushCnt = 0; + for (;;) { + if (runCnt-- == 0) { + runCnt = ProxyStopCheckPeriod; + if (!running) { + break; + } + } + // Poll to see if we are ready to send anything + fifo.poll(&trigger); + if (trigger.fst == 0) { // TODO: this check is a potential pitfall for custom triggers + continue; // there is one in progress + } - int runCnt = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; - uint64_t flushCnt = 0; - for (;;) { - if (runCnt-- == 0) { - runCnt = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; - if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) { + ProxyHandlerResult result = handler(trigger); + + // Send completion: reset only the high 64 bits + fifo.pop(); + // Flush the tail to device memory. This is either triggered every ProxyFlushPeriod to make sure + // that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush + // request. + if ((++flushCnt % ProxyFlushPeriod) == 0 || result == ProxyHandlerResult::FlushFifoTailAndContinue) { + // TODO: relocate this check: || (trigger.fields.type & mscclppSync) + fifo.flushTail(); + } + + if (result == ProxyHandlerResult::Stop) { break; } } - // Poll to see if we are ready to send anything - PROXYMSCCLPPCHECK(fifo->poll(&trigger)); - if (trigger.value[0] == 0) { - continue; // there is one in progreess - } - mscclppConn* conn = &comm->conns[trigger.fields.connId]; - processTrigger(trigger, conn); - - // Send completion: reset only the high 64 bits - PROXYMSCCLPPCHECK(fifo->pop()); - // Flush the tail to device memory. This is either triggered every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER to make sure - // that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush - // request. - if (((++flushCnt % MSCCLPP_PROXY_FIFO_FLUSH_COUNTER) == 0) || (trigger.fields.type & mscclppSync)) { - PROXYMSCCLPPCHECK(fifo->flushTail()); - } - } - - // make sure the tail is flushed before we shut the proxy - PROXYMSCCLPPCHECK(fifo->flushTail(/*sync=*/true)); - bool isP2pProxy = (proxyState->ibContext == nullptr); - if (isP2pProxy) { - cudaStream_t p2pStream = proxyState->p2pStream; - PROXYCUDACHECK(cudaStreamSynchronize(p2pStream)); - } - *run = MSCCLPP_PROXY_RUN_STATE_IDLE; - return NULL; + // make sure the tail is flushed before we shut the proxy + fifo.flushTail(/*sync=*/true); + // TODO: do these need to run? + // bool isP2pProxy = (proxyState->ibContext == nullptr); + // if (isP2pProxy) { + // cudaStream_t p2pStream = proxyState->p2pStream; + // PROXYCUDACHECK(cudaStreamSynchronize(p2pStream)); + // } + }); } -mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) +MSCCLPP_API_CPP void Proxy::stop() { - for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { - struct mscclppProxyState* proxyState = comm->proxyState[i]; - if (proxyState == NULL) - break; - - struct proxyArgs* args; - MSCCLPPCHECK(mscclppCalloc(&args, 1)); - args->comm = comm; - args->proxyState = proxyState; - - proxyState->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; - pthread_create(&proxyState->thread, NULL, mscclppProxyService, args); - if (proxyState->transportType == mscclppTransportP2P) { - mscclppSetThreadName(proxyState->thread, "MSCCLPP Service P2P - %02d", comm->cudaDev); - } else if (proxyState->transportType == mscclppTransportIB) { - mscclppSetThreadName(proxyState->thread, "MSCCLPP Service IB - %02d", i); - } + pimpl->running = false; + if (pimpl->service.joinable()) { + pimpl->service.join(); } - return mscclppSuccess; } -mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm) +MSCCLPP_API_CPP HostProxyFifo& Proxy::fifo() { - for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { - struct mscclppProxyState* proxyState = comm->proxyState[i]; - if (proxyState == NULL) - break; - - volatile int* run = (volatile int*)&proxyState->run; - if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) { - continue; - } - *run = MSCCLPP_PROXY_RUN_STATE_EXITING; - while (*run == MSCCLPP_PROXY_RUN_STATE_EXITING && *comm->abortFlag == 0) { - usleep(1000); - } - } - return mscclppSuccess; + return pimpl->fifo; } + +} // namespace mscclpp diff --git a/src/proxy_cpp.cc b/src/proxy_cpp.cc deleted file mode 100644 index 060bbfb0..00000000 --- a/src/proxy_cpp.cc +++ /dev/null @@ -1,112 +0,0 @@ -#include "api.h" -#include -#include -#include "utils.h" -#include "utils.hpp" -#include -#include - -namespace mscclpp { - -const int ProxyStopCheckPeriod = 1000; - -const int ProxyFlushPeriod = 4; - -struct Proxy::Impl -{ - ProxyHandler handler; - std::function threadInit; - HostProxyFifo fifo; - std::thread service; - std::atomic_bool running; - - Impl(ProxyHandler handler, std::function threadInit) - : handler(handler), threadInit(threadInit), running(false) - { - } -}; - -MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler, std::function threadInit) -{ - pimpl = std::make_unique(handler, threadInit); -} - -MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler) : Proxy(handler, [] {}) -{ -} - -MSCCLPP_API_CPP Proxy::~Proxy() -{ - if (pimpl) { - stop(); - } -} - -MSCCLPP_API_CPP void Proxy::start() -{ - pimpl->running = true; - pimpl->service = std::thread([this] { - pimpl->threadInit(); - - ProxyHandler handler = this->pimpl->handler; - HostProxyFifo& fifo = this->pimpl->fifo; - std::atomic_bool& running = this->pimpl->running; - ProxyTrigger trigger; - - int runCnt = ProxyStopCheckPeriod; - uint64_t flushCnt = 0; - for (;;) { - if (runCnt-- == 0) { - runCnt = ProxyStopCheckPeriod; - if (!running) { - break; - } - } - // Poll to see if we are ready to send anything - fifo.poll(&trigger); - if (trigger.fst == 0) { // TODO: this check is a potential pitfall for custom triggers - continue; // there is one in progress - } - - ProxyHandlerResult result = handler(trigger); - - // Send completion: reset only the high 64 bits - fifo.pop(); - // Flush the tail to device memory. This is either triggered every ProxyFlushPeriod to make sure - // that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush - // request. - if ((++flushCnt % ProxyFlushPeriod) == 0 || result == ProxyHandlerResult::FlushFifoTailAndContinue) { - // TODO: relocate this check: || (trigger.fields.type & mscclppSync) - fifo.flushTail(); - } - - if (result == ProxyHandlerResult::Stop) { - break; - } - } - - // make sure the tail is flushed before we shut the proxy - fifo.flushTail(/*sync=*/true); - // TODO: do these need to run? - // bool isP2pProxy = (proxyState->ibContext == nullptr); - // if (isP2pProxy) { - // cudaStream_t p2pStream = proxyState->p2pStream; - // PROXYCUDACHECK(cudaStreamSynchronize(p2pStream)); - // } - }); -} - -MSCCLPP_API_CPP void Proxy::stop() -{ - pimpl->running = false; - if (pimpl->service.joinable()) { - pimpl->service.join(); - } -} - -MSCCLPP_API_CPP HostProxyFifo& Proxy::fifo() -{ - return pimpl->fifo; -} - -} // namespace mscclpp