Delete old init code and other C-style code

This commit is contained in:
Olli Saarikivi
2023-05-10 22:03:42 +00:00
parent b2dfd8a8fe
commit ccf45b33a2
17 changed files with 131 additions and 2154 deletions

View File

@@ -1,5 +1,2 @@
file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cc *.h)
file(GLOB to_remove gdr.cc)
list(REMOVE_ITEM SOURCES ${to_remove})
file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cc)
target_sources(mscclpp PRIVATE ${SOURCES})

View File

@@ -1,8 +1,8 @@
#include "bootstrap.h"
#include "api.h"
#include "checks.hpp"
#include <mscclpp/core.hpp>
#include "utils.h"
#include "socket.h"
#include <algorithm>
#include <cstring>
@@ -10,6 +10,7 @@
#include <queue>
#include <thread>
#include <vector>
#include <list>
#include <sys/resource.h>
#include <sys/types.h>
@@ -17,17 +18,6 @@
using namespace mscclpp;
namespace {
uint64_t hashUniqueId(const mscclppBootstrapHandle& id)
{
const char* bytes = (const char*)&id;
uint64_t h = 0xdeadbeef;
for (int i = 0; i < (int)sizeof(mscclppBootstrapHandle); i++) {
h ^= h >> 32;
h *= 0x8db3db47fa2994ad;
h += bytes[i];
}
return h;
}
mscclppResult_t setFilesLimit()
{
@@ -515,551 +505,3 @@ MSCCLPP_API_CPP Bootstrap::~Bootstrap()
{
pimpl_->close();
}
// ------------------- Old bootstrap functions -------------------
struct BootstrapRootArgs
{
struct mscclppSocket* listenSock;
uint64_t magic;
};
/* Init functions */
static char bootstrapNetIfName[MAX_IF_NAME_SIZE + 1];
static union mscclppSocketAddress bootstrapNetIfAddr;
static int bootstrapNetInitDone = 0;
pthread_mutex_t bootstrapNetLock = PTHREAD_MUTEX_INITIALIZER;
mscclppResult_t bootstrapNetInit(const char* ip_port_pair)
{
if (bootstrapNetInitDone == 0) {
pthread_mutex_lock(&bootstrapNetLock);
if (bootstrapNetInitDone == 0) {
const char* env;
if (ip_port_pair) {
env = ip_port_pair;
} else {
env = getenv("MSCCLPP_COMM_ID");
}
if (env) {
union mscclppSocketAddress remoteAddr;
if (mscclppSocketGetAddrFromString(&remoteAddr, env) != mscclppSuccess) {
WARN("Invalid MSCCLPP_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
return mscclppInvalidArgument;
}
if (mscclppFindInterfaceMatchSubnet(bootstrapNetIfName, &bootstrapNetIfAddr, &remoteAddr, MAX_IF_NAME_SIZE,
1) <= 0) {
WARN("NET/Socket : No usable listening interface found");
return mscclppSystemError;
}
} else {
int nIfs = mscclppFindInterfaces(bootstrapNetIfName, &bootstrapNetIfAddr, MAX_IF_NAME_SIZE, 1);
if (nIfs <= 0) {
WARN("Bootstrap : no socket interface found");
return mscclppInternalError;
}
}
char line[SOCKET_NAME_MAXLEN + MAX_IF_NAME_SIZE + 2];
sprintf(line, " %s:", bootstrapNetIfName);
mscclppSocketToString(&bootstrapNetIfAddr, line + strlen(line));
INFO(MSCCLPP_INIT, "Bootstrap : Using%s", line);
bootstrapNetInitDone = 1;
}
pthread_mutex_unlock(&bootstrapNetLock);
}
return mscclppSuccess;
}
// Additional sync functions
static mscclppResult_t bootstrapNetSend(struct mscclppSocket* sock, void* data, int size)
{
MSCCLPPCHECK(mscclppSocketSend(sock, &size, sizeof(int)));
MSCCLPPCHECK(mscclppSocketSend(sock, data, size));
return mscclppSuccess;
}
static mscclppResult_t bootstrapNetRecv(struct mscclppSocket* sock, void* data, int size)
{
int recvSize;
MSCCLPPCHECK(mscclppSocketRecv(sock, &recvSize, sizeof(int)));
if (recvSize > size) {
WARN("Message truncated : received %d bytes instead of %d", recvSize, size);
return mscclppInternalError;
}
MSCCLPPCHECK(mscclppSocketRecv(sock, data, std::min(recvSize, size)));
return mscclppSuccess;
}
// struct ExtInfo
// {
// int rank;
// int nranks;
// union mscclppSocketAddress extAddressListenRoot;
// union mscclppSocketAddress extAddressListen;
// };
#include <sys/resource.h>
// static mscclppResult_t setFilesLimit()
// {
// struct rlimit filesLimit;
// SYSCHECK(getrlimit(RLIMIT_NOFILE, &filesLimit), "getrlimit");
// filesLimit.rlim_cur = filesLimit.rlim_max;
// SYSCHECK(setrlimit(RLIMIT_NOFILE, &filesLimit), "setrlimit");
// return mscclppSuccess;
// }
static void* bootstrapRoot(void* rargs)
{
struct BootstrapRootArgs* args = (struct BootstrapRootArgs*)rargs;
struct mscclppSocket* listenSock = args->listenSock;
uint64_t magic = args->magic;
mscclppResult_t res = mscclppSuccess;
int nranks = 0, c = 0;
struct ExtInfo info;
union mscclppSocketAddress* rankAddresses = NULL;
union mscclppSocketAddress* rankAddressesRoot = NULL; // for initial rank <-> root information exchange
union mscclppSocketAddress* zero = NULL;
MSCCLPPCHECKGOTO(mscclppCalloc(&zero, 1), res, out);
setFilesLimit();
TRACE(MSCCLPP_INIT, "BEGIN");
/* Receive addresses from all ranks */
do {
struct mscclppSocket sock;
MSCCLPPCHECKGOTO(mscclppSocketInit(&sock), res, out);
MSCCLPPCHECKGOTO(mscclppSocketAccept(&sock, listenSock), res, out);
MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, &info, sizeof(info)), res, out);
MSCCLPPCHECKGOTO(mscclppSocketClose(&sock), res, out);
if (c == 0) {
nranks = info.nRanks;
MSCCLPPCHECKGOTO(mscclppCalloc(&rankAddresses, nranks), res, out);
MSCCLPPCHECKGOTO(mscclppCalloc(&rankAddressesRoot, nranks), res, out);
}
if (nranks != info.nRanks) {
WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nranks, info.nRanks);
goto out;
}
if (memcmp(zero, &rankAddressesRoot[info.rank], sizeof(union mscclppSocketAddress)) != 0) {
WARN("Bootstrap Root : rank %d of %d ranks has already checked in", info.rank, nranks);
goto out;
}
// Save the connection handle for that rank
memcpy(rankAddressesRoot + info.rank, &info.extAddressListenRoot, sizeof(union mscclppSocketAddress));
memcpy(rankAddresses + info.rank, &info.extAddressListen, sizeof(union mscclppSocketAddress));
++c;
TRACE(MSCCLPP_INIT, "Received connect from rank %d total %d/%d", info.rank, c, nranks);
} while (c < nranks);
TRACE(MSCCLPP_INIT, "COLLECTED ALL %d HANDLES", nranks);
// Send the connect handle for the next rank in the AllGather ring
for (int r = 0; r < nranks; ++r) {
int next = (r + 1) % nranks;
struct mscclppSocket sock;
MSCCLPPCHECKGOTO(mscclppSocketInit(&sock, rankAddressesRoot + r, magic, mscclppSocketTypeBootstrap), res, out);
MSCCLPPCHECKGOTO(mscclppSocketConnect(&sock), res, out);
MSCCLPPCHECKGOTO(bootstrapNetSend(&sock, rankAddresses + next, sizeof(union mscclppSocketAddress)), res, out);
MSCCLPPCHECKGOTO(mscclppSocketClose(&sock), res, out);
}
TRACE(MSCCLPP_INIT, "SENT OUT ALL %d HANDLES", nranks);
out:
if (listenSock != NULL) {
mscclppSocketClose(listenSock);
free(listenSock);
}
if (rankAddresses)
free(rankAddresses);
if (rankAddressesRoot)
free(rankAddressesRoot);
if (zero)
free(zero);
free(rargs);
TRACE(MSCCLPP_INIT, "DONE");
return NULL;
}
mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle)
{
struct mscclppSocket* listenSock;
struct BootstrapRootArgs* args;
pthread_t thread;
MSCCLPPCHECK(mscclppCalloc(&listenSock, 1));
MSCCLPPCHECK(mscclppSocketInit(listenSock, &handle->addr, handle->magic, mscclppSocketTypeBootstrap, NULL, 0));
MSCCLPPCHECK(mscclppSocketListen(listenSock));
MSCCLPPCHECK(mscclppSocketGetAddr(listenSock, &handle->addr));
MSCCLPPCHECK(mscclppCalloc(&args, 1));
args->listenSock = listenSock;
args->magic = handle->magic;
NEQCHECK(pthread_create(&thread, NULL, bootstrapRoot, (void*)args), 0);
mscclppSetThreadName(thread, "MSCCLPP BootstrapR");
NEQCHECK(pthread_detach(thread), 0); // will not be pthread_join()'d
return mscclppSuccess;
}
// #include <netinet/in.h>
// #include <arpa/inet.h>
mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool isRoot, const char* ip_port_pair)
{
memset(handle, 0, sizeof(mscclppBootstrapHandle));
const char* env = NULL;
if (ip_port_pair) {
env = ip_port_pair;
} else {
env = getenv("MSCCLPP_COMM_ID");
}
if (env) {
handle->magic = 0xdeadbeef;
INFO(MSCCLPP_ENV, "MSCCLPP_COMM_ID set by environment to %s", env);
if (mscclppSocketGetAddrFromString(&handle->addr, env) != mscclppSuccess) {
WARN("Invalid MSCCLPP_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
return mscclppInvalidArgument;
}
if (isRoot)
MSCCLPPCHECK(bootstrapCreateRoot(handle));
} else {
MSCCLPPCHECK(getRandomData(&handle->magic, sizeof(handle->magic)));
memcpy(&handle->addr, &bootstrapNetIfAddr, sizeof(union mscclppSocketAddress));
MSCCLPPCHECK(bootstrapCreateRoot(handle));
}
printf("addr = %s port = %d\n", inet_ntoa(handle->addr.sin.sin_addr), (int)ntohs(handle->addr.sin.sin_port));
// printf("addr = %s\n", inet_ntoa((*(struct sockaddr_in*)&handle->addr.sa).sin_addr));
return mscclppSuccess;
}
struct UnexConn
{
int peer;
int tag;
struct mscclppSocket sock;
struct UnexConn* next;
};
struct BootstrapState
{
struct mscclppSocket listenSock;
struct mscclppSocket ringRecvSocket;
struct mscclppSocket ringSendSocket;
union mscclppSocketAddress* peerCommAddresses;
union mscclppSocketAddress* peerProxyAddresses;
struct UnexConn* unexpectedConnections;
int cudaDev;
int rank;
int nranks;
uint64_t magic;
volatile uint32_t* abortFlag;
};
mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscclppComm* comm)
{
int rank = comm->rank;
int nranks = comm->nRanks;
struct BootstrapState* state;
struct mscclppSocket* proxySocket;
mscclppSocketAddress nextAddr;
struct mscclppSocket sock, listenSockRoot;
struct ExtInfo info;
MSCCLPPCHECK(mscclppCalloc(&state, 1));
state->rank = rank;
state->nranks = nranks;
state->abortFlag = comm->abortFlag;
comm->bootstrap = state;
comm->magic = state->magic = handle->magic;
TRACE(MSCCLPP_INIT, "rank %d nranks %d", rank, nranks);
info.rank = rank;
info.nRanks = nranks;
// Create socket for other ranks to contact me
MSCCLPPCHECK(mscclppSocketInit(&state->listenSock, &bootstrapNetIfAddr, comm->magic, mscclppSocketTypeBootstrap,
comm->abortFlag));
MSCCLPPCHECK(mscclppSocketListen(&state->listenSock));
MSCCLPPCHECK(mscclppSocketGetAddr(&state->listenSock, &info.extAddressListen));
// Create socket for root to contact me
MSCCLPPCHECK(
mscclppSocketInit(&listenSockRoot, &bootstrapNetIfAddr, comm->magic, mscclppSocketTypeBootstrap, comm->abortFlag));
MSCCLPPCHECK(mscclppSocketListen(&listenSockRoot));
MSCCLPPCHECK(mscclppSocketGetAddr(&listenSockRoot, &info.extAddressListenRoot));
// stagger connection times to avoid an overload of the root
if (nranks > 128) {
long msec = rank;
struct timespec tv;
tv.tv_sec = msec / 1000;
tv.tv_nsec = 1000000 * (msec % 1000);
TRACE(MSCCLPP_INIT, "rank %d delaying connection to root by %ld msec", rank, msec);
(void)nanosleep(&tv, NULL);
}
// send info on my listening socket to root
MSCCLPPCHECK(mscclppSocketInit(&sock, &handle->addr, comm->magic, mscclppSocketTypeBootstrap, comm->abortFlag));
MSCCLPPCHECK(mscclppSocketConnect(&sock));
MSCCLPPCHECK(bootstrapNetSend(&sock, &info, sizeof(info)));
MSCCLPPCHECK(mscclppSocketClose(&sock));
// get info on my "next" rank in the bootstrap ring from root
MSCCLPPCHECK(mscclppSocketInit(&sock));
MSCCLPPCHECK(mscclppSocketAccept(&sock, &listenSockRoot));
MSCCLPPCHECK(bootstrapNetRecv(&sock, &nextAddr, sizeof(union mscclppSocketAddress)));
MSCCLPPCHECK(mscclppSocketClose(&sock));
MSCCLPPCHECK(mscclppSocketClose(&listenSockRoot));
MSCCLPPCHECK(
mscclppSocketInit(&state->ringSendSocket, &nextAddr, comm->magic, mscclppSocketTypeBootstrap, comm->abortFlag));
MSCCLPPCHECK(mscclppSocketConnect(&state->ringSendSocket));
// Accept the connect request from the previous rank in the AllGather ring
MSCCLPPCHECK(mscclppSocketInit(&state->ringRecvSocket));
MSCCLPPCHECK(mscclppSocketAccept(&state->ringRecvSocket, &state->listenSock));
// AllGather all listen handlers
MSCCLPPCHECK(mscclppCalloc(&state->peerCommAddresses, nranks));
MSCCLPPCHECK(mscclppSocketGetAddr(&state->listenSock, state->peerCommAddresses + rank));
MSCCLPPCHECK(bootstrapAllGather(state, state->peerCommAddresses, sizeof(union mscclppSocketAddress)));
// Create the service proxy
MSCCLPPCHECK(mscclppCalloc(&state->peerProxyAddresses, nranks));
// proxy is aborted through a message; don't set abortFlag
MSCCLPPCHECK(mscclppCalloc(&proxySocket, 1));
MSCCLPPCHECK(
mscclppSocketInit(proxySocket, &bootstrapNetIfAddr, comm->magic, mscclppSocketTypeProxy, comm->abortFlag));
MSCCLPPCHECK(mscclppSocketListen(proxySocket));
MSCCLPPCHECK(mscclppSocketGetAddr(proxySocket, state->peerProxyAddresses + rank));
MSCCLPPCHECK(bootstrapAllGather(state, state->peerProxyAddresses, sizeof(union mscclppSocketAddress)));
// MSCCLPPCHECK(mscclppProxyInit(comm, proxySocket, state->peerProxyAddresses));
TRACE(MSCCLPP_INIT, "rank %d nranks %d - DONE", rank, nranks);
return mscclppSuccess;
}
mscclppResult_t bootstrapAllGather(void* commState, void* allData, int size)
{
struct BootstrapState* state = (struct BootstrapState*)commState;
char* data = (char*)allData;
int rank = state->rank;
int nranks = state->nranks;
TRACE(MSCCLPP_INIT, "rank %d nranks %d size %d", rank, nranks, size);
/* Simple ring based AllGather
* At each step i receive data from (rank-i-1) from left
* and send previous step's data from (rank-i) to right
*/
for (int i = 0; i < nranks - 1; i++) {
size_t rslice = (rank - i - 1 + nranks) % nranks;
size_t sslice = (rank - i + nranks) % nranks;
// Send slice to the right
MSCCLPPCHECK(bootstrapNetSend(&state->ringSendSocket, data + sslice * size, size));
// Recv slice from the left
MSCCLPPCHECK(bootstrapNetRecv(&state->ringRecvSocket, data + rslice * size, size));
}
TRACE(MSCCLPP_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
return mscclppSuccess;
}
mscclppResult_t bootstrapSend(void* commState, int peer, int tag, void* data, int size)
{
mscclppResult_t ret = mscclppSuccess;
struct BootstrapState* state = (struct BootstrapState*)commState;
struct mscclppSocket sock;
MSCCLPPCHECKGOTO(mscclppSocketInit(&sock, state->peerCommAddresses + peer, state->magic, mscclppSocketTypeBootstrap,
state->abortFlag),
ret, fail);
MSCCLPPCHECKGOTO(mscclppSocketConnect(&sock), ret, fail);
MSCCLPPCHECKGOTO(bootstrapNetSend(&sock, &state->rank, sizeof(int)), ret, fail);
MSCCLPPCHECKGOTO(bootstrapNetSend(&sock, &tag, sizeof(int)), ret, fail);
MSCCLPPCHECKGOTO(bootstrapNetSend(&sock, data, size), ret, fail);
exit:
MSCCLPPCHECK(mscclppSocketClose(&sock));
return ret;
fail:
goto exit;
}
mscclppResult_t bootstrapBarrier(void* commState, int* ranks, int rank, int nranks, int tag)
{
if (nranks == 1)
return mscclppSuccess;
TRACE(MSCCLPP_INIT, "rank %d nranks %d tag %x - ENTER", rank, nranks, tag);
/* Simple intra process barrier
*
* Based on the dissemination algorithm by Debra Hensgen, Raphael Finkel, and Udi Manbet,
* "Two Algorithms for Barrier Synchronization," International Journal of Parallel Programming, 17(1):1-17, 1988"
*/
int data[1];
for (int mask = 1; mask < nranks; mask <<= 1) {
int src = (rank - mask + nranks) % nranks;
int dst = (rank + mask) % nranks;
MSCCLPPCHECK(bootstrapSend(commState, ranks[dst], tag, data, sizeof(data)));
MSCCLPPCHECK(bootstrapRecv(commState, ranks[src], tag, data, sizeof(data)));
}
TRACE(MSCCLPP_INIT, "rank %d nranks %d tag %x - DONE", rank, nranks, tag);
return mscclppSuccess;
}
mscclppResult_t bootstrapIntraNodeAllGather(void* commState, int* ranks, int rank, int nranks, void* allData, int size)
{
if (nranks == 1)
return mscclppSuccess;
char* data = (char*)allData;
TRACE(MSCCLPP_INIT, "rank %d nranks %d size %d - ENTER", rank, nranks, size);
for (int i = 1; i < nranks; i++) {
int src = (rank - i + nranks) % nranks;
int dst = (rank + i) % nranks;
MSCCLPPCHECK(bootstrapSend(commState, ranks[dst], /*tag=*/i, data + rank * size, size));
MSCCLPPCHECK(bootstrapRecv(commState, ranks[src], /*tag=*/i, data + src * size, size));
}
TRACE(MSCCLPP_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
return mscclppSuccess;
}
mscclppResult_t unexpectedEnqueue(struct BootstrapState* state, int peer, int tag, struct mscclppSocket* sock)
{
// New unex
struct UnexConn* unex;
MSCCLPPCHECK(mscclppCalloc(&unex, 1));
unex->peer = peer;
unex->tag = tag;
memcpy(&unex->sock, sock, sizeof(struct mscclppSocket));
// Enqueue
struct UnexConn* list = state->unexpectedConnections;
if (list == NULL) {
state->unexpectedConnections = unex;
return mscclppSuccess;
}
while (list->next)
list = list->next;
list->next = unex;
return mscclppSuccess;
}
mscclppResult_t unexpectedDequeue(struct BootstrapState* state, int peer, int tag, struct mscclppSocket* sock,
int* found)
{
struct UnexConn* elem = state->unexpectedConnections;
struct UnexConn* prev = NULL;
*found = 0;
while (elem) {
if (elem->peer == peer && elem->tag == tag) {
if (prev == NULL) {
state->unexpectedConnections = elem->next;
} else {
prev->next = elem->next;
}
memcpy(sock, &elem->sock, sizeof(struct mscclppSocket));
free(elem);
*found = 1;
return mscclppSuccess;
}
prev = elem;
elem = elem->next;
}
return mscclppSuccess;
}
static void unexpectedFree(struct BootstrapState* state)
{
struct UnexConn* elem = state->unexpectedConnections;
struct UnexConn* prev = NULL;
while (elem) {
prev = elem;
elem = elem->next;
free(prev);
}
return;
}
// We can't know who we'll receive from, so we need to receive everything at once
mscclppResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size)
{
mscclppResult_t ret = mscclppSuccess;
struct BootstrapState* state = (struct BootstrapState*)commState;
struct mscclppSocket sock;
int newPeer, newTag;
// Search unexpected connections first
int found;
MSCCLPPCHECK(unexpectedDequeue(state, peer, tag, &sock, &found));
if (found) {
MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, ((char*)data), size), ret, fail);
goto exit;
}
// Then look for new connections
while (1) {
MSCCLPPCHECKGOTO(mscclppSocketInit(&sock), ret, fail);
MSCCLPPCHECKGOTO(mscclppSocketAccept(&sock, &state->listenSock), ret, fail);
MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, &newPeer, sizeof(int)), ret, fail);
MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, &newTag, sizeof(int)), ret, fail);
if (newPeer == peer && newTag == tag) {
MSCCLPPCHECKGOTO(bootstrapNetRecv(&sock, ((char*)data), size), ret, fail);
goto exit;
}
// Unexpected connection. Save for later.
MSCCLPPCHECKGOTO(unexpectedEnqueue(state, newPeer, newTag, &sock), ret, fail);
}
exit:
MSCCLPPCHECK(mscclppSocketClose(&sock));
return ret;
fail:
goto exit;
}
mscclppResult_t bootstrapClose(void* commState)
{
struct BootstrapState* state = (struct BootstrapState*)commState;
if (state->unexpectedConnections != NULL) {
unexpectedFree(state);
if (*state->abortFlag == 0) {
WARN("Unexpected connections are not empty");
return mscclppInternalError;
}
}
MSCCLPPCHECK(mscclppSocketClose(&state->listenSock));
MSCCLPPCHECK(mscclppSocketClose(&state->ringSendSocket));
MSCCLPPCHECK(mscclppSocketClose(&state->ringRecvSocket));
free(state->peerCommAddresses);
free(state);
return mscclppSuccess;
}
mscclppResult_t bootstrapAbort(void* commState)
{
struct BootstrapState* state = (struct BootstrapState*)commState;
if (commState == NULL)
return mscclppSuccess;
MSCCLPPCHECK(mscclppSocketClose(&state->listenSock));
MSCCLPPCHECK(mscclppSocketClose(&state->ringSendSocket));
MSCCLPPCHECK(mscclppSocketClose(&state->ringRecvSocket));
free(state->peerCommAddresses);
free(state->peerProxyAddresses);
free(state);
return mscclppSuccess;
}

45
src/c_style_remnants.cc Normal file
View File

@@ -0,0 +1,45 @@
#include "mscclpp.h"
#include "debug.h"
#include "config.h"
#include "api.h"
MSCCLPP_API void mscclppDefaultLogHandler(const char* msg)
{
mscclppDebugDefaultLogHandler(msg);
}
MSCCLPP_API mscclppResult_t mscclppSetLogHandler(mscclppLogHandler_t handler)
{
return mscclppDebugSetLogHandler(handler);
}
MSCCLPP_API mscclppResult_t mscclppSetBootstrapConnTimeout(int timeout)
{
mscclppConfig* config = mscclppConfig::getInstance();
config->setBootstrapConnectionTimeoutConfig(timeout);
return mscclppSuccess;
}
MSCCLPP_API const char* mscclppGetErrorString(mscclppResult_t code)
{
switch (code) {
case mscclppSuccess:
return "no error";
case mscclppUnhandledCudaError:
return "unhandled cuda error";
case mscclppSystemError:
return "unhandled system error";
case mscclppInternalError:
return "internal error";
case mscclppInvalidArgument:
return "invalid argument";
case mscclppInvalidUsage:
return "invalid usage";
case mscclppRemoteError:
return "remote process exited or there was a network error";
case mscclppInProgress:
return "MSCCL++ operation in progress";
default:
return "unknown result code";
}
}

View File

@@ -2,7 +2,6 @@
#include "api.h"
#include "checks.hpp"
#include "comm.h"
#include "communicator.hpp"
#include "connection.hpp"
#include "debug.h"

View File

@@ -1,75 +0,0 @@
#include "gdr.h"
// Used to make the GDR library calls thread safe
pthread_mutex_t gdrLock = PTHREAD_MUTEX_INITIALIZER;
gdr_t wrap_gdr_open(void)
{
return gdr_open();
}
mscclppResult_t wrap_gdr_close(gdr_t g)
{
int ret = gdr_close(g);
if (ret != 0) {
WARN("gdr_close() failed: %d", ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space,
gdr_mh_t* handle)
{
int ret;
GDRLOCKCALL(gdr_pin_buffer(g, addr, size, p2p_token, va_space, handle), ret);
if (ret != 0) {
WARN("gdr_pin_buffer(addr %lx, size %zi) failed: %d", addr, size, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle)
{
int ret;
GDRLOCKCALL(gdr_unpin_buffer(g, handle), ret);
if (ret != 0) {
WARN("gdr_unpin_buffer(handle %lx) failed: %d", handle.h, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t* info)
{
int ret;
GDRLOCKCALL(gdr_get_info(g, handle, info), ret);
if (ret != 0) {
WARN("gdr_get_info(handle %lx) failed: %d", handle.h, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void** va, size_t size)
{
int ret;
GDRLOCKCALL(gdr_map(g, handle, va, size), ret);
if (ret != 0) {
WARN("gdr_map(handle %lx, size %zi) failed: %d", handle.h, size, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void* va, size_t size)
{
int ret;
GDRLOCKCALL(gdr_unmap(g, handle, va, size), ret);
if (ret != 0) {
WARN("gdr_unmap(handle %lx, va %p, size %zi) failed: %d", handle.h, va, size, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}

View File

@@ -8,13 +8,14 @@
#include "alloc.h"
#include "api.h"
#include "checks.hpp"
#include "comm.h"
#include "debug.h"
#include "ib.hpp"
#include <mscclpp/core.hpp>
#include <infiniband/verbs.h>
#include <string>
#define MAXCONNECTIONS 64
namespace mscclpp {
IbMr::IbMr(void* pd, void* buff, std::size_t size) : buff(buff)

View File

@@ -1,26 +0,0 @@
#pragma once
#include "mscclpp.h"
#include "socket.h"
#include "comm.h"
// ------------------- Old bootstrap headers: to be removed -------------------
struct mscclppBootstrapHandle
{
uint64_t magic;
union mscclppSocketAddress addr;
};
mscclppResult_t bootstrapNetInit(const char* ip_port_pair = NULL);
mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle);
mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool isRoot = true,
const char* ip_port_pair = NULL);
mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscclppComm* comm);
mscclppResult_t bootstrapAllGather(void* commState, void* allData, int size);
mscclppResult_t bootstrapSend(void* commState, int peer, int tag, void* data, int size);
mscclppResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size);
mscclppResult_t bootstrapBarrier(void* commState, int* ranks, int rank, int nranks, int tag);
mscclppResult_t bootstrapIntraNodeAllGather(void* commState, int* ranks, int rank, int nranks, void* allData, int size);
mscclppResult_t bootstrapClose(void* commState);
mscclppResult_t bootstrapAbort(void* commState);

View File

@@ -1,65 +0,0 @@
/*************************************************************************
* Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_COMM_H_
#define MSCCLPP_COMM_H_
#include "ib.hpp"
#include "proxy.h"
#include <memory>
#include <vector>
#define MAXCONNECTIONS 64
struct mscclppBufferRegistration
{
void* data;
uint64_t size;
};
struct mscclppConn
{
int connId;
mscclppTransport_t transport;
int remoteRank;
uint64_t buffSize;
struct mscclppDevConn* devConn;
struct mscclppHostConn* hostConn;
std::vector<mscclppBufferRegistration> bufferRegistrations;
std::vector<mscclppBufferRegistration> remoteBufferRegistrations;
mscclpp::IbCtx* ibCtx;
#if defined(ENABLE_NPKIT)
std::vector<uint64_t> npkitUsedReqIds;
std::vector<uint64_t> npkitFreeReqIds;
#endif
};
struct mscclppComm
{
struct mscclppConn conns[MAXCONNECTIONS];
struct mscclppDevConn devConns[MAXCONNECTIONS];
int nConns;
void* bootstrap;
// Magic number for all network communication. Not a security key -- only goal is to detect mismatches.
uint64_t magic;
int rank; // my rank in the communicator
int nRanks; // number of GPUs in communicator
int cudaDev; // my cuda device index
int devNumaNode; // my device's NUMA node
// Flag to ask MSCCLPP kernels to abort
volatile uint32_t* abortFlag;
std::unique_ptr<mscclpp::IbCtx> ibContext[MSCCLPP_IB_MAX_DEVS];
struct mscclppProxyState* proxyState[MSCCLPP_PROXY_MAX_NUM];
};
#endif

View File

@@ -1,156 +0,0 @@
#ifndef MSCCLPP_GDR_H_
#define MSCCLPP_GDR_H_
#include "align.h"
#include "alloc.h"
#include "checks.h"
#include "debug.h"
#include "gdrapi.h"
// These can be used if the GDR library isn't thread safe
#include <pthread.h>
extern pthread_mutex_t gdrLock;
#define GDRLOCK() pthread_mutex_lock(&gdrLock)
#define GDRUNLOCK() pthread_mutex_unlock(&gdrLock)
#define GDRLOCKCALL(cmd, ret) \
do { \
GDRLOCK(); \
ret = cmd; \
GDRUNLOCK(); \
} while (false)
#define GDRCHECK(cmd) \
do { \
int e; \
/* GDRLOCKCALL(cmd, e); */ \
e = cmd; \
if (e != 0) { \
WARN("GDRCOPY failure %d", e); \
return mscclppSystemError; \
} \
} while (false)
gdr_t wrap_gdr_open(void);
mscclppResult_t wrap_gdr_close(gdr_t g);
mscclppResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space,
gdr_mh_t* handle);
mscclppResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle);
mscclppResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t* info);
mscclppResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void** va, size_t size);
mscclppResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void* va, size_t size);
// Global GDR driver handle
extern gdr_t mscclppGdrCopy;
typedef struct gdr_mem_desc
{
void* gdrDevMem;
void* gdrMap;
size_t gdrOffset;
size_t gdrMapSize;
gdr_mh_t gdrMh;
} gdr_mem_desc_t;
static gdr_t mscclppGdrInit()
{
// int libMajor, libMinor, drvMajor, drvMinor;
gdr_t handle = wrap_gdr_open();
// if (handle != NULL) {
// mscclppResult_t res;
// // Query the version of libgdrapi
// MSCCLPPCHECKGOTO(wrap_gdr_runtime_get_version(&libMajor, &libMinor), res, error);
// // Query the version of gdrdrv driver
// MSCCLPPCHECKGOTO(wrap_gdr_driver_get_version(handle, &drvMajor, &drvMinor), res, error);
// // Only support GDRAPI 2.1 and later
// if (libMajor < 2 || (libMajor == 2 && libMinor < 1) || drvMajor < 2 || (drvMajor == 2 && drvMinor < 1)) {
// goto error;
// }
// else
// INFO(MSCCLPP_INIT, "GDRCOPY enabled library %d.%d driver %d.%d", libMajor, libMinor, drvMajor, drvMinor);
// }
return handle;
// error:
// if (handle != NULL) (void) wrap_gdr_close(handle);
// return NULL;
}
template <typename T>
mscclppResult_t mscclppGdrCudaCallocDebug(T** ptr, T** devPtr, size_t nelem, void** gdrDesc, const char* filefunc,
int line)
{
mscclppResult_t result = mscclppSuccess;
cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed;
*ptr = nullptr;
*devPtr = nullptr;
*gdrDesc = nullptr;
CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
gdr_info_t info;
size_t mapSize;
gdr_mh_t mh;
char* devMem;
void* gdrMap;
ssize_t off;
gdr_mem_desc_t* md;
uint64_t alignedAddr;
size_t align;
mapSize = sizeof(T) * nelem;
// GDRCOPY Pinned buffer has to be a minimum of a GPU_PAGE_SIZE
ALIGN_SIZE(mapSize, GPU_PAGE_SIZE);
// GDRCOPY Pinned buffer has to be GPU_PAGE_SIZE aligned too
MSCCLPPCHECKGOTO(mscclppCudaCalloc(&devMem, mapSize + GPU_PAGE_SIZE - 1), result, finish);
alignedAddr = (((uint64_t)devMem) + GPU_PAGE_OFFSET) & GPU_PAGE_MASK;
align = alignedAddr - (uint64_t)devMem;
MSCCLPPCHECKGOTO(wrap_gdr_pin_buffer(mscclppGdrCopy, alignedAddr, mapSize, 0, 0, &mh), result, finish);
MSCCLPPCHECKGOTO(wrap_gdr_map(mscclppGdrCopy, mh, &gdrMap, mapSize), result, finish);
MSCCLPPCHECKGOTO(wrap_gdr_get_info(mscclppGdrCopy, mh, &info), result, finish);
// Will offset ever be non zero ?
off = info.va - alignedAddr;
MSCCLPPCHECKGOTO(mscclppCalloc(&md, 1), result, finish);
md->gdrDevMem = devMem;
md->gdrMap = gdrMap;
md->gdrMapSize = mapSize;
md->gdrOffset = off + align;
md->gdrMh = mh;
*gdrDesc = md;
*ptr = (T*)((char*)gdrMap + off);
if (devPtr)
*devPtr = (T*)(devMem + off + align);
TRACE(mscclpp_INIT, "GDRCOPY : allocated devMem %p gdrMap %p offset %lx mh %lx mapSize %zi at %p", md->gdrDevMem,
md->gdrMap, md->gdrOffset, md->gdrMh.h, md->gdrMapSize, *ptr);
return mscclppSuccess;
finish:
CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
if (*ptr == nullptr)
WARN("Failed to CUDA calloc %ld bytes", nelem * sizeof(T));
INFO(MSCCLPP_ALLOC, "%s:%d Cuda Alloc Size %ld pointer %p", filefunc, line, nelem * sizeof(T), *ptr);
return result;
}
#define mscclppGdrCudaCalloc(...) mscclppGdrCudaCallocDebug(__VA_ARGS__, __FILE__, __LINE__)
static mscclppResult_t mscclppGdrCudaFree(void* gdrDesc)
{
gdr_mem_desc_t* md = (gdr_mem_desc_t*)gdrDesc;
MSCCLPPCHECK(wrap_gdr_unmap(mscclppGdrCopy, md->gdrMh, md->gdrMap, md->gdrMapSize));
MSCCLPPCHECK(wrap_gdr_unpin_buffer(mscclppGdrCopy, md->gdrMh));
CUDACHECK(cudaFree(md->gdrDevMem));
free(md);
return mscclppSuccess;
}
#endif

View File

@@ -1,52 +0,0 @@
#ifndef MSCCLPP_REGISTERED_PTR_HPP_
#define MSCCLPP_REGISTERED_PTR_HPP_
namespace mscclpp {
template <typename T> class RegisteredPtr
{
RegisteredMemory memory;
size_t offset;
public:
RegisteredPtr(RegisteredMemory memory, size_t offset) : memory(memory), offset(offset)
{
}
RegisteredPtr(RegisteredMemory memory) : RegisteredPtr(memory, 0)
{
}
~RegisteredPtr()
{
}
RegisteredMemory memory()
{
return memory;
}
T* data()
{
return reinterpret_cast<T*>(memory.data());
}
size_t size()
{
return memory.size() / sizeof(T);
}
size_t offset()
{
return offset;
}
RegisteredPtr<T> operator+(size_t offset)
{
return RegisteredPtr<T>(memory, this->offset + offset);
}
// TODO: all other relevant overloads
};
} // namespace mscclpp
#endif // MSCCLPP_REGISTERED_PTR_HPP_

View File

@@ -1,920 +0,0 @@
#include "alloc.h"
#include "api.h"
#include "bootstrap.h"
#include "checks.h"
#include "config.h"
#if defined(MSCCLPP_USE_GDRCOPY)
#include "gdr.h"
#endif
#include "infiniband/verbs.h"
#include "mscclpp.h"
#include <map>
#include <sstream>
#include <vector>
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
static uint64_t hashUniqueId(mscclppUniqueId const& id)
{
char const* bytes = (char const*)&id;
uint64_t h = 0xdeadbeef;
for (int i = 0; i < (int)sizeof(mscclppUniqueId); i++) {
h ^= h >> 32;
h *= 0x8db3db47fa2994ad;
h += bytes[i];
}
return h;
}
pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER;
static bool initialized = false;
// static size_t maxLocalSizeBytes = 0;
#if defined(MSCCLPP_USE_GDRCOPY)
gdr_t mscclppGdrCopy = NULL;
mscclppResult_t initGdrCopy()
{
if (mscclppGdrCopy == NULL) {
mscclppGdrCopy = mscclppGdrInit();
if (mscclppGdrCopy == NULL) {
WARN("GDR init failed");
return mscclppSystemError;
}
}
return mscclppSuccess;
}
#endif
static mscclppResult_t mscclppInit()
{
if (__atomic_load_n(&initialized, __ATOMIC_ACQUIRE))
return mscclppSuccess;
pthread_mutex_lock(&initLock);
if (!initialized) {
// Always initialize bootstrap network
MSCCLPPCHECK(bootstrapNetInit());
__atomic_store_n(&initialized, true, __ATOMIC_RELEASE);
}
pthread_mutex_unlock(&initLock);
return mscclppSuccess;
}
static std::string mscclppShmFileName(mscclppComm_t comm, int rank)
{
std::stringstream ss;
ss << "mscclpp." << std::hex << comm->magic << "." << rank;
return ss.str();
}
MSCCLPP_API mscclppResult_t mscclppGetUniqueId(mscclppUniqueId* out)
{
MSCCLPPCHECK(mscclppInit());
// mscclppCHECK(PtrCheck(out, "GetUniqueId", "out"));
mscclppResult_t res = bootstrapGetUniqueId((struct mscclppBootstrapHandle*)out);
TRACE_CALL("mscclppGetUniqueId(0x%llx)", (unsigned long long)hashUniqueId(*out));
return res;
}
MSCCLPP_API mscclppResult_t mscclppBootstrapAllGather(mscclppComm_t comm, void* data, int size)
{
MSCCLPPCHECK(bootstrapAllGather(comm->bootstrap, data, size));
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char* ipPortPair, int rank)
{
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(initGdrCopy());
#endif
mscclppResult_t res = mscclppSuccess;
mscclppComm_t _comm = NULL;
// uint64_t hash = getHostHash();
// uint64_t *hashes;
// std::map<uint64_t, int> hashToNode;
MSCCLPPCHECKGOTO(mscclppCalloc(&_comm, 1), res, fail);
_comm->rank = rank;
_comm->nRanks = nranks;
_comm->devNumaNode = -1;
// We assume that the user has set the device to the intended one already
CUDACHECK(cudaGetDevice(&_comm->cudaDev));
MSCCLPPCHECK(bootstrapNetInit(ipPortPair));
mscclppBootstrapHandle handle;
MSCCLPPCHECK(bootstrapGetUniqueId(&handle, rank == 0, ipPortPair));
_comm->magic = handle.magic;
MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t**)&_comm->abortFlag, 1), res, fail);
MSCCLPPCHECK(bootstrapInit(&handle, _comm));
#if defined(ENABLE_NPKIT)
// Init NPKit
MSCCLPPCHECK(NpKit::Init(_comm->rank));
#endif
*comm = _comm;
return res;
fail:
if (_comm) {
if (_comm->abortFlag)
mscclppCudaHostFree((void*)_comm->abortFlag);
free(_comm);
}
if (comm)
*comm = NULL;
return res;
}
MSCCLPP_API mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank)
{
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(initGdrCopy());
#endif
mscclppResult_t res = mscclppSuccess;
mscclppComm_t _comm = NULL;
mscclppBootstrapHandle* handle = (mscclppBootstrapHandle*)&id;
MSCCLPPCHECKGOTO(mscclppCalloc(&_comm, 1), res, fail);
_comm->rank = rank;
_comm->nRanks = nranks;
// We assume that the user has set the device to the intended one already
CUDACHECK(cudaGetDevice(&_comm->cudaDev));
MSCCLPPCHECK(bootstrapNetInit());
_comm->magic = handle->magic;
MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t**)&_comm->abortFlag, 1), res, fail);
MSCCLPPCHECK(bootstrapInit(handle, _comm));
#if defined(ENABLE_NPKIT)
// Init NPKit
MSCCLPPCHECK(NpKit::Init(_comm->rank));
#endif
*comm = _comm;
return res;
fail:
if (_comm) {
if (_comm->abortFlag)
mscclppCudaHostFree((void*)_comm->abortFlag);
free(_comm);
}
if (comm)
*comm = NULL;
return res;
}
MSCCLPP_API mscclppResult_t mscclppCommDestroy(mscclppComm_t comm)
{
#if defined(ENABLE_NPKIT)
const char* npkitDumpDir = nullptr;
#endif
if (comm == NULL)
return mscclppSuccess;
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState* proxyState = comm->proxyState[i];
if (proxyState) {
MSCCLPPCHECK(proxyState->fifo.destroy());
if (proxyState->p2pStream)
CUDACHECK(cudaStreamDestroy(proxyState->p2pStream));
free(proxyState);
}
}
for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) {
if (comm->ibContext[i]) {
comm->ibContext[i].reset(nullptr);
}
}
for (int i = 0; i < comm->nConns; i++) {
struct mscclppConn* conn = &comm->conns[i];
if (conn) {
MSCCLPPCHECK(mscclppCudaFree(conn->devConn->localSignalEpochId));
MSCCLPPCHECK(mscclppCudaFree(conn->devConn->waitEpochId));
if (conn->hostConn)
delete conn->hostConn;
}
}
if (comm->bootstrap)
MSCCLPPCHECK(bootstrapClose(comm->bootstrap));
mscclppCudaHostFree((void*)comm->abortFlag);
free(comm);
#if defined(ENABLE_NPKIT)
// Dump NPKit events and shutdown
npkitDumpDir = getenv("NPKIT_DUMP_DIR");
if (npkitDumpDir == nullptr) {
WARN("NPKIT_DUMP_DIR is empty");
} else {
MSCCLPPCHECK(NpKit::Dump(npkitDumpDir));
}
MSCCLPPCHECK(NpKit::Shutdown());
#endif
return mscclppSuccess;
}
MSCCLPP_API const char* mscclppGetErrorString(mscclppResult_t code)
{
switch (code) {
case mscclppSuccess:
return "no error";
case mscclppUnhandledCudaError:
return "unhandled cuda error";
case mscclppSystemError:
return "unhandled system error";
case mscclppInternalError:
return "internal error";
case mscclppInvalidArgument:
return "invalid argument";
case mscclppInvalidUsage:
return "invalid usage";
case mscclppRemoteError:
return "remote process exited or there was a network error";
case mscclppInProgress:
return "MSCCL++ operation in progress";
default:
return "unknown result code";
}
}
MSCCLPP_API mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag,
mscclppDevConn_t** devConn)
{
for (int i = 0; i < comm->nConns; i++) {
if (comm->devConns[i].remoteRank == remoteRank && comm->devConns[i].tag == tag) {
*devConn = &comm->devConns[i];
return mscclppSuccess;
}
}
return mscclppInvalidArgument;
}
MSCCLPP_API mscclppResult_t mscclppGetAllDeviceConnections(mscclppComm_t comm, mscclppDevConn_t** devConns, int* nConns)
{
*nConns = comm->nConns;
*devConns = comm->devConns;
return mscclppSuccess;
}
#if defined(ENABLE_NPKIT)
static void npkitInitReqIds(struct mscclppComm* comm)
{
for (int i = 0; i < comm->nConns; i++) {
struct mscclppConn* conn = &comm->conns[i];
conn->npkitUsedReqIds.resize(0);
conn->npkitFreeReqIds.resize(MSCCLPP_IB_MAX_SENDS);
for (uint64_t j = 0; j < MSCCLPP_IB_MAX_SENDS; j++) {
conn->npkitFreeReqIds[j] = MSCCLPP_IB_MAX_SENDS - j - 1;
}
}
}
static void npkitCollectEntryEvent(struct mscclppConn* conn, uint8_t type, uint32_t size)
{
uint64_t reqId = 0;
if (conn->npkitFreeReqIds.size() == 0) {
reqId = conn->npkitUsedReqIds.size();
} else {
reqId = conn->npkitFreeReqIds.back();
conn->npkitFreeReqIds.pop_back();
}
conn->npkitUsedReqIds.push_back(reqId);
NpKit::CollectCpuEvent(type, size, (uint32_t)reqId, NpKit::GetCpuTimestamp(), conn->connId);
}
static void npkitCollectExitEvents(struct mscclppConn* conn, uint8_t type)
{
while (conn->npkitUsedReqIds.size()) {
uint64_t reqId = conn->npkitUsedReqIds.back();
NpKit::CollectCpuEvent(type, 0, (uint32_t)reqId, NpKit::GetCpuTimestamp(), conn->connId);
conn->npkitFreeReqIds.push_back(reqId);
conn->npkitUsedReqIds.pop_back();
}
}
#else
#define npkitInitReqIds(comm)
#define npkitCollectEntryEvent(conn, type, size)
#define npkitCollectExitEvents(conn, type)
#endif
struct mscclppHostP2PConn : mscclppHostConn
{
mscclppHostP2PConn(mscclppConn* _conn, cudaStream_t _stream) : conn(_conn), p2pStream(_stream)
{
}
void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize)
{
put(1, dstDataOffset, 1, srcDataOffset, dataSize);
}
void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset,
uint64_t dataSize)
{
void* srcBuff = (void*)((char*)conn->bufferRegistrations[src].data + srcDataOffset);
void* dstBuff = (void*)((char*)conn->remoteBufferRegistrations[dst].data + dstDataOffset);
CUDACHECKNORET(cudaMemcpyAsync(dstBuff, srcBuff, dataSize, cudaMemcpyDeviceToDevice, p2pStream));
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)dataSize);
}
void signal()
{
CUDACHECKNORET(cudaMemcpyAsync(&conn->devConn->remoteSignalEpochId->proxy,
&(conn->devConn->localSignalEpochId->device), sizeof(uint64_t),
cudaMemcpyDeviceToDevice, p2pStream));
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t));
}
void wait()
{
}
void flush()
{
CUDACHECKNORET(cudaStreamSynchronize(p2pStream));
npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT);
}
mscclppConn* conn;
cudaStream_t p2pStream;
};
struct mscclppHostIBConn : mscclppHostConn
{
mscclppHostIBConn(mscclppConn* conn) : conn(conn)
{
this->ibQp = NULL;
}
void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize)
{
put(1, dstDataOffset, 1, srcDataOffset, dataSize);
}
void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset,
uint64_t dataSize)
{
this->ibQp->stageSend(this->ibMrs[src], this->remoteIbMrInfos[dst], (uint32_t)dataSize,
/*wrId=*/0, /*srcOffset=*/srcDataOffset, /*dstOffset=*/dstDataOffset, /*signaled=*/false);
this->ibQp->postSend();
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)dataSize);
}
void signal()
{
// My local device flag is copied to the remote's proxy flag
this->ibQp->stageSend(this->ibMrs[0], this->remoteIbMrInfos[0], sizeof(uint64_t),
/*wrId=*/0, /*srcOffset=*/0, /*dstOffset=*/sizeof(uint64_t), /*signaled=*/true);
this->ibQp->postSend();
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t));
}
void wait()
{
}
void flush()
{
bool isWaiting = true;
while (isWaiting) {
int wcNum = this->ibQp->pollCq();
if (wcNum < 0) {
WARN("pollCq failed: errno %d", errno);
continue;
}
for (int i = 0; i < wcNum; ++i) {
struct ibv_wc* wc = (struct ibv_wc*)this->ibQp->getWc(i);
if (wc->status != IBV_WC_SUCCESS) {
WARN("wc status %d", wc->status);
continue;
}
if (wc->opcode == IBV_WC_RDMA_WRITE) {
isWaiting = false;
break;
}
}
}
npkitCollectExitEvents(conn, NPKIT_EVENT_IB_SEND_EXIT);
}
mscclppConn* conn;
mscclpp::IbQp* ibQp;
std::vector<const mscclpp::IbMr*> ibMrs;
std::vector<mscclpp::IbMrInfo> remoteIbMrInfos;
};
MSCCLPP_API mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int remoteRank, int tag,
mscclppTransport_t transportType, const char* ibDev)
{
// save this processes numa binding and set it to the one closest to the device
// so that all the allocation are close to the device
if (comm->devNumaNode == -1) {
// in case this is our first time
MSCCLPPCHECK(getDeviceNumaNode(comm->cudaDev, &comm->devNumaNode));
INFO(MSCCLPP_INIT, "NUMA node of device %d is set to %d", comm->cudaDev, comm->devNumaNode);
}
// save numa node bitmask to change it back to user's numa node
mscclppNumaState curProcessState;
MSCCLPPCHECK(getNumaState(&curProcessState));
// change to device's numa node so that the following allocation are close to the device
MSCCLPPCHECK(numaBind(comm->devNumaNode));
if (comm->nConns == MAXCONNECTIONS) {
WARN("Too many connections made");
return mscclppInternalError;
}
int connId = comm->nConns;
struct mscclppConn* conn = &comm->conns[connId];
conn->connId = connId;
conn->transport = transportType;
conn->buffSize = 0;
conn->ibCtx = NULL;
int ibDevIdx = -1;
if (transportType == mscclppTransportIB) {
// Check if an IB context exists
int firstNullIdx = -1;
for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) {
if (comm->ibContext[i] == NULL) {
if (firstNullIdx == -1) {
firstNullIdx = i;
}
} else if (strncmp(comm->ibContext[i]->getDevName().c_str(), ibDev, IBV_SYSFS_NAME_MAX) == 0) {
ibDevIdx = i;
break;
}
}
// If not, create a new one
if (ibDevIdx == -1) {
// Create a new context.
ibDevIdx = firstNullIdx;
comm->ibContext[ibDevIdx].reset(new mscclpp::IbCtx(std::string(ibDev)));
}
// Set the ib context for this conn
conn->ibCtx = comm->ibContext[ibDevIdx].get();
} else if (transportType == mscclppTransportP2P) {
// do the rest of the initialization later
} else if (transportType == mscclppTransportSHM) {
WARN("Shared memory interconnection is not implemented yet!");
return mscclppInternalError;
} else {
WARN("Unexpected connection type!");
return mscclppInvalidUsage;
}
// Find/create a proxy state for the given connection
struct mscclppProxyState* proxyState = NULL;
// First see if there is a matching context
// If not, find the first empty proxy
int firstEmptyProxyIndex = -1;
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState* curProxy = comm->proxyState[i];
if (curProxy && (curProxy->transportType == transportType)) {
if ((transportType == mscclppTransportIB && curProxy->ibContext == conn->ibCtx) ||
(transportType == mscclppTransportP2P)) {
proxyState = curProxy;
break; // we found the matching context
}
}
if (curProxy == NULL && firstEmptyProxyIndex == -1) {
firstEmptyProxyIndex = i;
}
}
if (proxyState == NULL && firstEmptyProxyIndex == -1) {
WARN("Too many proxies have been allocated!");
return mscclppInvalidUsage;
}
// If we couldn't find a matching context, create one
if (proxyState == NULL) {
MSCCLPPCHECK(mscclppCalloc(&proxyState, 1));
MSCCLPPCHECK(proxyState->fifo.create());
if (transportType == mscclppTransportIB) {
proxyState->ibContext = conn->ibCtx;
proxyState->p2pStream = NULL;
} else if (transportType == mscclppTransportP2P) {
proxyState->ibContext = NULL;
CUDACHECK(cudaStreamCreateWithFlags(&proxyState->p2pStream, cudaStreamNonBlocking));
}
proxyState->numaNodeToBind = comm->devNumaNode;
// INFO(MSCCLPP_INIT, "NUMA node for device %d is %d", cudaDev, *numaNode);
proxyState->transportType = transportType;
comm->proxyState[firstEmptyProxyIndex] = proxyState;
}
if (proxyState == NULL) {
// Cannot reach
WARN("Proxy allocation failed!");
return mscclppInternalError;
}
if (transportType == mscclppTransportIB) {
conn->hostConn = new mscclppHostIBConn(conn);
} else if (transportType == mscclppTransportP2P) {
conn->hostConn = new mscclppHostP2PConn(conn, proxyState->p2pStream);
}
struct mscclppDevConn* devConn = &comm->devConns[connId];
conn->devConn = devConn;
conn->devConn->localBuff = nullptr;
MSCCLPPCHECK(mscclppCudaCalloc(&conn->devConn->localSignalEpochId, 1));
MSCCLPPCHECK(mscclppCudaCalloc(&conn->devConn->waitEpochId, 1));
conn->devConn->remoteRank = remoteRank;
conn->devConn->tag = tag;
conn->devConn->fifo.connId = connId;
#if defined(MSCCLPP_USE_GDRCOPY)
conn->devConn->fifo.triggerFifo = proxyState->fifo.triggerFifoDev;
#else
conn->devConn->fifo.triggerFifo = proxyState->fifo.triggerFifo;
#endif
conn->devConn->fifo.triggerFifoHead = proxyState->fifo.fifoHead;
conn->devConn->fifo.triggerFifoTail = proxyState->fifo.fifoTailDev;
comm->nConns++;
// change the numa binding back to user's
MSCCLPPCHECK(setNumaState(curProcessState));
mscclppBufferHandle_t signalHandle = -1;
MSCCLPPCHECK(mscclppRegisterBufferForConnection(comm, connId, conn->devConn->localSignalEpochId,
sizeof(mscclppDevConnSignalEpochId), &signalHandle));
if (signalHandle != 0) {
WARN("signal handle should be 0");
return mscclppInternalError;
}
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff,
uint64_t buffSize, mscclppTransport_t transportType, const char* ibDev)
{
int connId = comm->nConns;
MSCCLPPCHECK(mscclppConnectWithoutBuffer(comm, remoteRank, tag, transportType, ibDev));
struct mscclppConn* conn = &comm->conns[connId];
conn->buffSize = buffSize;
conn->devConn->localBuff = localBuff;
mscclppBufferHandle_t localBuffHandle = -1;
MSCCLPPCHECK(mscclppRegisterBufferForConnection(comm, connId, localBuff, buffSize, &localBuffHandle));
if (localBuffHandle != 1) {
WARN("data buffer handle should be 1");
return mscclppInternalError;
}
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppRegisterBufferForConnection(mscclppComm_t comm, int connIdx, void* localBuff,
uint64_t buffSize, mscclppBufferHandle_t* handle)
{
if (connIdx >= comm->nConns) {
WARN("connIdx out of range");
return mscclppInvalidArgument;
}
mscclppConn& conn = comm->conns[connIdx];
*handle = conn.bufferRegistrations.size();
conn.bufferRegistrations.emplace_back();
conn.bufferRegistrations.back().data = localBuff;
conn.bufferRegistrations.back().size = buffSize;
return mscclppSuccess;
}
struct mscclppBufferRegistrationInfo
{
cudaIpcMemHandle_t cudaHandle;
mscclpp::IbMrInfo ibMrInfo;
uint64_t size;
};
struct connInfo
{
mscclpp::IbQpInfo infoQp;
std::vector<mscclppBufferRegistrationInfo> bufferInfos;
struct header
{
mscclpp::IbQpInfo infoQp;
int numBufferInfos;
};
mscclppResult_t sendOverBootstrap(void* bootstrap, int remoteRank, int tag)
{
header h;
h.infoQp = infoQp;
h.numBufferInfos = bufferInfos.size();
MSCCLPPCHECK(bootstrapSend(bootstrap, remoteRank, tag, &h, sizeof(header)));
MSCCLPPCHECK(bootstrapSend(bootstrap, remoteRank, tag, bufferInfos.data(),
bufferInfos.size() * sizeof(mscclppBufferRegistrationInfo)));
return mscclppSuccess;
}
mscclppResult_t recvOverBootstrap(void* bootstrap, int remoteRank, int tag)
{
header h;
MSCCLPPCHECK(bootstrapRecv(bootstrap, remoteRank, tag, &h, sizeof(header)));
infoQp = h.infoQp;
bufferInfos.resize(h.numBufferInfos);
MSCCLPPCHECK(bootstrapRecv(bootstrap, remoteRank, tag, bufferInfos.data(),
bufferInfos.size() * sizeof(mscclppBufferRegistrationInfo)));
return mscclppSuccess;
}
};
mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*input*/)
{
if (conn == NULL) {
WARN("connection cannot be null");
return mscclppInternalError;
}
// Add all registered buffers
for (const auto& bufReg : conn->bufferRegistrations) {
connInfo->bufferInfos.emplace_back();
CUDACHECK(cudaIpcGetMemHandle(&connInfo->bufferInfos.back().cudaHandle, bufReg.data));
connInfo->bufferInfos.back().size = bufReg.size;
}
return mscclppSuccess;
}
mscclppResult_t mscclppP2pConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/)
{
if (connInfo == NULL || conn == NULL) {
WARN("ipcHandles or connection cannot be null");
return mscclppInternalError;
}
if (connInfo->bufferInfos.size() < 1) {
WARN("at least 1 buffer info expected");
return mscclppInternalError;
}
// Open all remote registered buffers
for (size_t i = 0; i < connInfo->bufferInfos.size(); i++) {
mscclppBufferRegistration newBufReg;
CUDACHECK(
cudaIpcOpenMemHandle(&newBufReg.data, connInfo->bufferInfos[i].cudaHandle, cudaIpcMemLazyEnablePeerAccess));
newBufReg.size = connInfo->bufferInfos[i].size;
conn->remoteBufferRegistrations.push_back(newBufReg);
}
if (conn->remoteBufferRegistrations[0].size != sizeof(mscclppDevConnSignalEpochId)) {
WARN("buffer registration zero size doesn't match sizeof(mscclppDevConnSignalEpochId)");
return mscclppInternalError;
}
conn->devConn->remoteSignalEpochId = (mscclppDevConnSignalEpochId*)conn->remoteBufferRegistrations[0].data;
// For backwards compatibility with the previous API that assumed one data buffer per connection, set the remote
// buffer to the first remote data buffer
if (conn->remoteBufferRegistrations.size() > 1) {
conn->devConn->remoteBuff = conn->remoteBufferRegistrations[1].data;
}
return mscclppSuccess;
}
mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/)
{
if (connInfo == NULL || conn == NULL) {
WARN("connInfo or connection cannot be null");
return mscclppInternalError;
}
struct mscclppDevConn* devConn = conn->devConn;
struct mscclppHostIBConn* hostConn = (struct mscclppHostIBConn*)conn->hostConn;
devConn->remoteBuff = NULL;
devConn->remoteSignalEpochId = NULL;
mscclpp::IbCtx* ibCtx = conn->ibCtx;
if (hostConn->ibQp == NULL) {
hostConn->ibQp = ibCtx->createQp();
}
// Add all registered buffers
for (const auto& bufReg : conn->bufferRegistrations) {
hostConn->ibMrs.emplace_back(ibCtx->registerMr(bufReg.data, sizeof(struct mscclppDevConnSignalEpochId)));
connInfo->bufferInfos.emplace_back();
connInfo->bufferInfos.back().ibMrInfo = hostConn->ibMrs.back()->getInfo();
connInfo->bufferInfos.back().size = bufReg.size;
}
connInfo->infoQp = hostConn->ibQp->getInfo();
return mscclppSuccess;
}
mscclppResult_t mscclppIbConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/)
{
if (connInfo == NULL || conn == NULL) {
WARN("ipcHandles or connection cannot be null");
return mscclppInternalError;
}
struct mscclppHostIBConn* hostConn = (struct mscclppHostIBConn*)conn->hostConn;
hostConn->ibQp->rtr(connInfo->infoQp);
hostConn->ibQp->rts();
// No remote pointers to set with IB, so we just set the Mrs
// Push the Mrs for all the remote registered buffers
for (size_t i = 1; i < connInfo->bufferInfos.size(); i++) {
hostConn->remoteIbMrInfos.push_back(connInfo->bufferInfos[i].ibMrInfo);
mscclppBufferRegistration newBufReg;
newBufReg.data = nullptr;
newBufReg.size = connInfo->bufferInfos[i].size;
conn->remoteBufferRegistrations.push_back(newBufReg);
}
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm)
{
// Send info to peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn* conn = &comm->conns[i];
struct connInfo cInfo;
if (conn->transport == mscclppTransportP2P) {
MSCCLPPCHECK(mscclppP2pConnectionSetupStart(&cInfo, conn));
} else if (conn->transport == mscclppTransportIB) {
MSCCLPPCHECK(mscclppIbConnectionSetupStart(&cInfo, conn));
}
// TODO: from saemal: do we possibly deadlock if there are too many outstanding sends?
// MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &cInfo,
// sizeof(cInfo)));
MSCCLPPCHECK(cInfo.sendOverBootstrap(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag));
}
// Recv info from peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn* conn = &comm->conns[i];
struct connInfo cInfo;
MSCCLPPCHECK(cInfo.recvOverBootstrap(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag));
if (conn->transport == mscclppTransportP2P) {
MSCCLPPCHECK(mscclppP2pConnectionSetupEnd(&cInfo, conn));
} else if (conn->transport == mscclppTransportIB) {
MSCCLPPCHECK(mscclppIbConnectionSetupEnd(&cInfo, conn));
}
}
// a barrier to ensure setup on all gpus are done and we can return to the user
MSCCLPPCHECK(mscclppBootstrapBarrier(comm));
return mscclppSuccess;
}
struct bufferInfo
{
cudaIpcMemHandle_t handleBuff;
mscclpp::IbMrInfo infoBuffMr;
};
MSCCLPP_API mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* local_memory, size_t size,
mscclppRegisteredMemory* regMem)
{
std::vector<const mscclpp::IbMr*> ibMrs;
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn* conn = &comm->conns[i];
struct bufferInfo bInfo;
const mscclpp::IbMr* ibBuffMr;
// TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB
if (conn->transport == mscclppTransportP2P) {
CUDACHECK(cudaIpcGetMemHandle(&bInfo.handleBuff, local_memory));
} else if (conn->transport == mscclppTransportIB) {
ibBuffMr = conn->ibCtx->registerMr(local_memory, size);
bInfo.infoBuffMr = ibBuffMr->getInfo();
ibMrs.emplace_back(ibBuffMr);
}
MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &bInfo, sizeof(bInfo)));
}
// Recv info from peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn* conn = &comm->conns[i];
struct bufferInfo bInfo;
mscclppRegisteredMemoryP2P p2p;
p2p.IbMr = NULL;
p2p.remoteBuff = NULL;
MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &bInfo, sizeof(bInfo)));
// TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB
if (conn->transport == mscclppTransportP2P) {
CUDACHECK(cudaIpcOpenMemHandle((void**)&p2p.remoteBuff, bInfo.handleBuff, cudaIpcMemLazyEnablePeerAccess));
} else if (conn->transport == mscclppTransportIB) {
p2p.IbMr = ibMrs[i];
}
regMem->p2p.push_back(p2p);
}
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem,
void* srcBuff, size_t size, uint32_t srcOffset,
uint32_t dstOffset, int64_t stream)
{
int ret = 0;
// TODO: transport should be an argument too so user can decide which transport to use
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn* conn = &comm->conns[i];
// TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB
if (conn->transport == mscclppTransportP2P) {
void* dstBuff = regMem->p2p[i].remoteBuff;
CUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, size, cudaMemcpyDeviceToDevice, (cudaStream_t)stream));
} else {
WARN("mscclppRegisteredBufferWrite not implemented for IB");
return mscclppInternalError;
// TODO: fix the following (Olli: probably by including the relevant ibBuffMr in the mscclppRegisteredMemory)
// struct mscclppHostIBConn* hostConn = (struct mscclppHostIBConn*)conn->hostConn;
// hostConn->ibQp->stageSend(hostConn->ibBuffMr, &hostConn->ibBuffMrRemoteInfo, (uint32_t)size,
// /*wrId=*/0, /*srcOffset=*/srcOffset, /*dstOffset=*/dstOffset, /*signaled=*/false);
// if ((ret = hostConn->ibQp->postSend()) != 0) {
// // Return value is errno.
// WARN("data postSend failed: errno %d", ret);
// }
// // ??
// // npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_ENTRY, (uint32_t)trigger.fields.dataSize,
// // trigger.fields.connId);
}
}
return mscclppSuccess;
}
// TODO: destroy registered buffer
MSCCLPP_API mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm)
{
npkitInitReqIds(comm);
MSCCLPPCHECK(mscclppProxyCreate(comm));
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppBootstrapBarrier(mscclppComm_t comm)
{
int* tmp = new int[comm->nRanks];
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
delete[] tmp;
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppProxyStop(mscclppComm_t comm)
{
// a barrier to make sure all ranks are done with their work before stopping the proxy
MSCCLPPCHECK(mscclppBootstrapBarrier(comm));
MSCCLPPCHECK(mscclppProxyDestroy(comm));
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppCommRank(mscclppComm_t comm, int* rank)
{
if (comm == NULL || rank == NULL) {
WARN("comm or rank cannot be null");
return mscclppInvalidUsage;
}
*rank = comm->rank;
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppCommSize(mscclppComm_t comm, int* size)
{
if (comm == NULL || size == NULL) {
WARN("comm or size cannot be null");
return mscclppInvalidUsage;
}
*size = comm->nRanks;
return mscclppSuccess;
}
MSCCLPP_API void mscclppDefaultLogHandler(const char* msg)
{
mscclppDebugDefaultLogHandler(msg);
}
MSCCLPP_API mscclppResult_t mscclppSetLogHandler(mscclppLogHandler_t handler)
{
return mscclppDebugSetLogHandler(handler);
}
MSCCLPP_API mscclppResult_t mscclppSetBootstrapConnTimeout(int timeout)
{
mscclppConfig* config = mscclppConfig::getInstance();
config->setBootstrapConnectionTimeoutConfig(timeout);
return mscclppSuccess;
}

View File

@@ -3,7 +3,7 @@
#include <unistd.h>
#include "alloc.h"
#include "npkit/npkit.h"
#include "npkit.h"
#include <cuda_runtime.h>
uint64_t NpKit::rank_ = 0;

View File

@@ -3,8 +3,8 @@
#include <string>
#include "npkit/npkit_event.h"
#include "npkit/npkit_struct.h"
#include "npkit_event.h"
#include "npkit_struct.h"
class NpKit
{

View File

@@ -1,213 +1,112 @@
#include "alloc.h"
#include "checks.h"
#include "comm.h"
#include "debug.h"
#include "ib.hpp"
#include "socket.h"
#include <emmintrin.h>
#include <map>
#include <sys/syscall.h>
#include "api.h"
#include <mscclpp/core.hpp>
#include <mscclpp/proxy.hpp>
#include "utils.h"
#include "utils.hpp"
#include <atomic>
#include <thread>
#define MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD 100
namespace mscclpp {
#define PROXYCUDACHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
WARN("CUDA error from proxy: %s", cudaGetErrorString(err)); \
return NULL; \
} \
} while (false)
const int ProxyStopCheckPeriod = 1000;
#define PROXYMSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
if (mscclppDebugNoWarn == 0) \
INFO(MSCCLPP_ALL, "%s:%d -> %d", __FILE__, __LINE__, res); \
return NULL; \
} \
} while (0);
const int ProxyFlushPeriod = 4;
struct proxyArgs
struct Proxy::Impl
{
struct mscclppComm* comm;
struct mscclppProxyState* proxyState;
ProxyHandler handler;
std::function<void()> threadInit;
HostProxyFifo fifo;
std::thread service;
std::atomic_bool running;
Impl(ProxyHandler handler, std::function<void()> threadInit)
: handler(handler), threadInit(threadInit), running(false)
{
}
};
mscclppResult_t mscclppProxyFifo::create()
MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler, std::function<void()> threadInit)
{
MSCCLPPCHECK(mscclppCudaCalloc(&this->fifoHead, 1));
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(
mscclppGdrCudaCalloc(&this->triggerFifo, &this->triggerFifoDev, MSCCLPP_PROXY_FIFO_SIZE, &this->triggerFifoDesc));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&this->fifoTailDevHostPtr, &this->fifoTailDev, 1, &this->fifoTailDesc));
#else
MSCCLPPCHECK(mscclppCudaHostCalloc(&this->triggerFifo, MSCCLPP_PROXY_FIFO_SIZE));
MSCCLPPCHECK(mscclppCudaCalloc(&this->fifoTailDev, 1));
#endif
CUDACHECK(cudaStreamCreateWithFlags(&this->stream, cudaStreamNonBlocking));
this->fifoTailHost = 0;
return mscclppSuccess;
pimpl = std::make_unique<Impl>(handler, threadInit);
}
mscclppResult_t mscclppProxyFifo::destroy()
MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler) : Proxy(handler, [] {})
{
MSCCLPPCHECK(mscclppCudaFree(this->fifoHead));
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(mscclppGdrCudaFree(this->triggerFifoDesc));
MSCCLPPCHECK(mscclppGdrCudaFree(this->fifoTailDesc));
#else
MSCCLPPCHECK(mscclppCudaHostFree(this->triggerFifo));
MSCCLPPCHECK(mscclppCudaFree(this->fifoTailDev));
#endif
CUDACHECK(cudaStreamDestroy(this->stream));
return mscclppSuccess;
}
// return true if the trigger is valid
mscclppResult_t mscclppProxyFifo::poll(mscclppTrigger* trigger)
MSCCLPP_API_CPP Proxy::~Proxy()
{
__m128i xmm0 = _mm_load_si128((__m128i*)&this->triggerFifo[this->fifoTailHost % MSCCLPP_PROXY_FIFO_SIZE]);
_mm_store_si128((__m128i*)trigger, xmm0);
return mscclppSuccess;
}
mscclppResult_t mscclppProxyFifo::pop()
{
*(volatile uint64_t*)(&this->triggerFifo[this->fifoTailHost % MSCCLPP_PROXY_FIFO_SIZE]) = 0;
(this->fifoTailHost)++;
return mscclppSuccess;
}
mscclppResult_t mscclppProxyFifo::flushTail(bool sync)
{
// Flush the tail to device memory. This is either triggered every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER to make sure
// that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush
// request.
#if defined(MSCCLPP_USE_GDRCOPY)
*(volatile uint64_t*)(this->fifoTailDevHostPtr) = this->fifoTailHost;
#else
CUDACHECK(
cudaMemcpyAsync(this->fifoTailDev, &(this->fifoTailHost), sizeof(uint64_t), cudaMemcpyHostToDevice, this->stream));
if (sync) {
CUDACHECK(cudaStreamSynchronize(this->stream));
}
#endif
return mscclppSuccess;
}
static void processTrigger(const mscclppTrigger trigger, mscclppConn* conn)
{
// Iterate over what send is needed
if (trigger.fields.type & mscclppData) {
conn->hostConn->put(trigger.fields.dstDataOffset, trigger.fields.srcDataOffset, trigger.fields.dataSize);
}
if (trigger.fields.type & mscclppFlag) {
conn->hostConn->signal();
}
// Wait for completion
if (trigger.fields.type & mscclppSync) {
conn->hostConn->flush();
if (pimpl) {
stop();
}
}
void* mscclppProxyService(void* _args)
MSCCLPP_API_CPP void Proxy::start()
{
struct proxyArgs* args = (struct proxyArgs*)_args;
struct mscclppComm* comm = args->comm;
struct mscclppProxyState* proxyState = args->proxyState;
free(_args); // allocated in mscclppProxyCreate
pimpl->running = true;
pimpl->service = std::thread([this] {
pimpl->threadInit();
// from this point on, proxy thread will stay close to the device
PROXYMSCCLPPCHECK(numaBind(comm->devNumaNode));
ProxyHandler handler = this->pimpl->handler;
HostProxyFifo& fifo = this->pimpl->fifo;
std::atomic_bool& running = this->pimpl->running;
ProxyTrigger trigger;
struct mscclppProxyFifo* fifo = &proxyState->fifo;
volatile mscclppProxyRunState_t* run = &proxyState->run;
mscclppTrigger trigger;
int runCnt = ProxyStopCheckPeriod;
uint64_t flushCnt = 0;
for (;;) {
if (runCnt-- == 0) {
runCnt = ProxyStopCheckPeriod;
if (!running) {
break;
}
}
// Poll to see if we are ready to send anything
fifo.poll(&trigger);
if (trigger.fst == 0) { // TODO: this check is a potential pitfall for custom triggers
continue; // there is one in progress
}
int runCnt = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
uint64_t flushCnt = 0;
for (;;) {
if (runCnt-- == 0) {
runCnt = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) {
ProxyHandlerResult result = handler(trigger);
// Send completion: reset only the high 64 bits
fifo.pop();
// Flush the tail to device memory. This is either triggered every ProxyFlushPeriod to make sure
// that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush
// request.
if ((++flushCnt % ProxyFlushPeriod) == 0 || result == ProxyHandlerResult::FlushFifoTailAndContinue) {
// TODO: relocate this check: || (trigger.fields.type & mscclppSync)
fifo.flushTail();
}
if (result == ProxyHandlerResult::Stop) {
break;
}
}
// Poll to see if we are ready to send anything
PROXYMSCCLPPCHECK(fifo->poll(&trigger));
if (trigger.value[0] == 0) {
continue; // there is one in progreess
}
mscclppConn* conn = &comm->conns[trigger.fields.connId];
processTrigger(trigger, conn);
// Send completion: reset only the high 64 bits
PROXYMSCCLPPCHECK(fifo->pop());
// Flush the tail to device memory. This is either triggered every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER to make sure
// that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush
// request.
if (((++flushCnt % MSCCLPP_PROXY_FIFO_FLUSH_COUNTER) == 0) || (trigger.fields.type & mscclppSync)) {
PROXYMSCCLPPCHECK(fifo->flushTail());
}
}
// make sure the tail is flushed before we shut the proxy
PROXYMSCCLPPCHECK(fifo->flushTail(/*sync=*/true));
bool isP2pProxy = (proxyState->ibContext == nullptr);
if (isP2pProxy) {
cudaStream_t p2pStream = proxyState->p2pStream;
PROXYCUDACHECK(cudaStreamSynchronize(p2pStream));
}
*run = MSCCLPP_PROXY_RUN_STATE_IDLE;
return NULL;
// make sure the tail is flushed before we shut the proxy
fifo.flushTail(/*sync=*/true);
// TODO: do these need to run?
// bool isP2pProxy = (proxyState->ibContext == nullptr);
// if (isP2pProxy) {
// cudaStream_t p2pStream = proxyState->p2pStream;
// PROXYCUDACHECK(cudaStreamSynchronize(p2pStream));
// }
});
}
mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm)
MSCCLPP_API_CPP void Proxy::stop()
{
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState* proxyState = comm->proxyState[i];
if (proxyState == NULL)
break;
struct proxyArgs* args;
MSCCLPPCHECK(mscclppCalloc(&args, 1));
args->comm = comm;
args->proxyState = proxyState;
proxyState->run = MSCCLPP_PROXY_RUN_STATE_RUNNING;
pthread_create(&proxyState->thread, NULL, mscclppProxyService, args);
if (proxyState->transportType == mscclppTransportP2P) {
mscclppSetThreadName(proxyState->thread, "MSCCLPP Service P2P - %02d", comm->cudaDev);
} else if (proxyState->transportType == mscclppTransportIB) {
mscclppSetThreadName(proxyState->thread, "MSCCLPP Service IB - %02d", i);
}
pimpl->running = false;
if (pimpl->service.joinable()) {
pimpl->service.join();
}
return mscclppSuccess;
}
mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm)
MSCCLPP_API_CPP HostProxyFifo& Proxy::fifo()
{
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState* proxyState = comm->proxyState[i];
if (proxyState == NULL)
break;
volatile int* run = (volatile int*)&proxyState->run;
if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) {
continue;
}
*run = MSCCLPP_PROXY_RUN_STATE_EXITING;
while (*run == MSCCLPP_PROXY_RUN_STATE_EXITING && *comm->abortFlag == 0) {
usleep(1000);
}
}
return mscclppSuccess;
return pimpl->fifo;
}
} // namespace mscclpp

View File

@@ -1,112 +0,0 @@
#include "api.h"
#include <mscclpp/core.hpp>
#include <mscclpp/proxy.hpp>
#include "utils.h"
#include "utils.hpp"
#include <atomic>
#include <thread>
namespace mscclpp {
const int ProxyStopCheckPeriod = 1000;
const int ProxyFlushPeriod = 4;
struct Proxy::Impl
{
ProxyHandler handler;
std::function<void()> threadInit;
HostProxyFifo fifo;
std::thread service;
std::atomic_bool running;
Impl(ProxyHandler handler, std::function<void()> threadInit)
: handler(handler), threadInit(threadInit), running(false)
{
}
};
MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler, std::function<void()> threadInit)
{
pimpl = std::make_unique<Impl>(handler, threadInit);
}
MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler) : Proxy(handler, [] {})
{
}
MSCCLPP_API_CPP Proxy::~Proxy()
{
if (pimpl) {
stop();
}
}
MSCCLPP_API_CPP void Proxy::start()
{
pimpl->running = true;
pimpl->service = std::thread([this] {
pimpl->threadInit();
ProxyHandler handler = this->pimpl->handler;
HostProxyFifo& fifo = this->pimpl->fifo;
std::atomic_bool& running = this->pimpl->running;
ProxyTrigger trigger;
int runCnt = ProxyStopCheckPeriod;
uint64_t flushCnt = 0;
for (;;) {
if (runCnt-- == 0) {
runCnt = ProxyStopCheckPeriod;
if (!running) {
break;
}
}
// Poll to see if we are ready to send anything
fifo.poll(&trigger);
if (trigger.fst == 0) { // TODO: this check is a potential pitfall for custom triggers
continue; // there is one in progress
}
ProxyHandlerResult result = handler(trigger);
// Send completion: reset only the high 64 bits
fifo.pop();
// Flush the tail to device memory. This is either triggered every ProxyFlushPeriod to make sure
// that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush
// request.
if ((++flushCnt % ProxyFlushPeriod) == 0 || result == ProxyHandlerResult::FlushFifoTailAndContinue) {
// TODO: relocate this check: || (trigger.fields.type & mscclppSync)
fifo.flushTail();
}
if (result == ProxyHandlerResult::Stop) {
break;
}
}
// make sure the tail is flushed before we shut the proxy
fifo.flushTail(/*sync=*/true);
// TODO: do these need to run?
// bool isP2pProxy = (proxyState->ibContext == nullptr);
// if (isP2pProxy) {
// cudaStream_t p2pStream = proxyState->p2pStream;
// PROXYCUDACHECK(cudaStreamSynchronize(p2pStream));
// }
});
}
MSCCLPP_API_CPP void Proxy::stop()
{
pimpl->running = false;
if (pimpl->service.joinable()) {
pimpl->service.join();
}
}
MSCCLPP_API_CPP HostProxyFifo& Proxy::fifo()
{
return pimpl->fifo;
}
} // namespace mscclpp