This commit is contained in:
Saeed Maleki
2023-04-08 01:57:22 +00:00
parent e303a532ff
commit ec83a27e83
5 changed files with 44 additions and 28 deletions

View File

@@ -452,9 +452,12 @@ mscclppResult_t mscclppSocketGetAddr(struct mscclppSocket* sock, union mscclppSo
static mscclppResult_t socketTryAccept(struct mscclppSocket* sock)
{
static time_t initTime = -1;
if (initTime == -1)
initTime = clockNano() / 1e9;
static bool timeInitialized = false;
static mscclppTime_t initTime;
if (!timeInitialized){
timeInitialized = true;
initTime = getClock();
}
mscclppConfig* config = mscclppConfig::getInstance();
time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig();
@@ -462,14 +465,14 @@ static mscclppResult_t socketTryAccept(struct mscclppSocket* sock)
sock->fd = accept(sock->acceptFd, &sock->addr.sa, &socklen);
if (sock->fd != -1) {
sock->state = mscclppSocketStateAccepted;
initTime = -1;
timeInitialized = false;
} else if (errno != EAGAIN && errno != EWOULDBLOCK) {
WARN("socketTryAccept: get errno %d that is not EAGAIN or EWOULDBLOCK", errno);
initTime = -1;
timeInitialized = false;
return mscclppSystemError;
} else if ((clockNano() / 1e9) - initTime > acceptTimeout) {
} else if (elapsedClock(getClock(), initTime) > acceptTimeout) {
WARN("socketTryAccept: exceeded timeout (%ld) sec", acceptTimeout);
initTime = -1;
timeInitialized = false;
return mscclppRemoteError;
} else {
usleep(SLEEP_INT);
@@ -513,10 +516,13 @@ static mscclppResult_t socketFinalizeAccept(struct mscclppSocket* sock)
static mscclppResult_t socketStartConnect(struct mscclppSocket* sock)
{
static time_t initTime = -1;
if (initTime == -1) {
initTime = clockNano() / 1e9;
static bool timeInitialized = false;
static mscclppTime_t initTime;
if (!timeInitialized){
timeInitialized = true;
initTime = getClock();
}
mscclppConfig* config = mscclppConfig::getInstance();
time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig();
@@ -524,16 +530,16 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock)
int ret = connect(sock->fd, &sock->addr.sa, sock->salen);
if (ret == 0) {
sock->state = mscclppSocketStateConnected;
initTime = -1;
timeInitialized = false;
return mscclppSuccess;
} else if (errno == EINPROGRESS) {
sock->state = mscclppSocketStateConnectPolling;
return mscclppSuccess;
} else if (errno == ECONNREFUSED || errno == ETIMEDOUT) {
if ((clockNano() / 1e9) - initTime > acceptTimeout) {
if (elapsedClock(getClock(), initTime) > acceptTimeout) {
WARN("socketStartConnect: exceeded timeout (%ld) sec", acceptTimeout);
sock->state = mscclppSocketStateError;
initTime = -1;
timeInitialized = false;
return mscclppRemoteError;
}
usleep(SLEEP_INT);
@@ -544,17 +550,20 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock)
char line[SOCKET_NAME_MAXLEN + 1];
sock->state = mscclppSocketStateError;
WARN("socketStartConnect: Connect to %s failed : %s", mscclppSocketToString(&sock->addr, line), strerror(errno));
initTime = -1;
timeInitialized = false;
return mscclppSystemError;
}
}
static mscclppResult_t socketPollConnect(struct mscclppSocket* sock)
{
static time_t initTime = -1;
if (initTime == -1) {
initTime = clockNano() / 1e9;
static bool timeInitialized = false;
static mscclppTime_t initTime;
if (!timeInitialized){
timeInitialized = true;
initTime = getClock();
}
mscclppConfig* config = mscclppConfig::getInstance();
time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig();
@@ -567,7 +576,7 @@ static mscclppResult_t socketPollConnect(struct mscclppSocket* sock)
pfd.events = POLLOUT;
SYSCHECK(ret = poll(&pfd, 1, timeout), "poll");
if (ret == 0) {
initTime = -1;
timeInitialized = false;
return mscclppSuccess;
}
@@ -576,10 +585,10 @@ static mscclppResult_t socketPollConnect(struct mscclppSocket* sock)
SYSCHECK(getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, (void*)&ret, &rlen), "getsockopt");
if (ret == 0) {
initTime = -1;
timeInitialized = false;
sock->state = mscclppSocketStateConnected;
} else if (ret == ECONNREFUSED || ret == ETIMEDOUT) {
if ((clockNano() / 1e9) - initTime > acceptTimeout) {
if (elapsedClock(getClock(), initTime) > acceptTimeout) {
WARN("socketPollConnect: exceeded timeout (%ld) sec", acceptTimeout);
sock->state = mscclppSocketStateError;
return mscclppRemoteError;

View File

@@ -16,8 +16,6 @@
#include <sys/mman.h>
#include <unistd.h>
uint64_t clockNano(); // from utils.h with which we have a circular dependency
template <typename T> mscclppResult_t mscclppCudaHostCallocDebug(T** ptr, size_t nelem, const char* filefunc, int line)
{
mscclppResult_t result = mscclppSuccess;

View File

@@ -16,6 +16,7 @@
#include <stdint.h>
#include <time.h>
// int mscclppCudaCompCap();
// PCI Bus ID <-> int64 conversion functions
@@ -48,12 +49,9 @@ static long log2i(long n)
return l;
}
inline uint64_t clockNano()
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return uint64_t(ts.tv_sec) * 1000 * 1000 * 1000 + ts.tv_nsec;
}
typedef std::chrono::steady_clock::time_point mscclppTime_t;
mscclppTime_t getClock();
int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end);
/* get any bytes of random data from /dev/urandom, return 0 if it succeeds; else
* return -1 */

View File

@@ -270,3 +270,13 @@ mscclppResult_t setNumaState(mscclppNumaState state)
numa_bind(state);
return mscclppSuccess;
}
inline mscclppTime_t getClock()
{
return std::chrono::steady_clock::now();
}
inline int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end)
{
return std::chrono::duration_cast<std::chrono::seconds>(end - start).count();
}

View File

@@ -174,6 +174,7 @@ testResult_t AllGatherInitData(struct testArgs* args, int in_place)
CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
delete dataHost;
CUDACHECK(cudaDeviceSynchronize());
MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm));
return testSuccess;
}