Update unit tests (#81)

This commit is contained in:
Changho Hwang
2023-06-08 17:58:05 +08:00
committed by GitHub
parent 0c14a67ad2
commit 798631bd52
29 changed files with 1220 additions and 661 deletions

50
.azure-pipelines/ut.yml Normal file
View File

@@ -0,0 +1,50 @@
trigger:
- main
pr:
- main
jobs:
- job: UnitTest
timeoutInMinutes: 30
pool:
name: mscclpp
container:
image: superbench/superbench:v0.8.0-cuda12.1
options: --privileged --ipc=host --gpus=all --ulimit memlock=-1:-1
steps:
- task: Bash@3
name: Build
displayName: Build
inputs:
targetType: 'inline'
script: |
curl -L -C- https://github.com/Kitware/CMake/releases/download/v3.26.4/cmake-3.26.4-linux-x86_64.tar.gz -o /tmp/cmake-3.26.4-linux-x86_64.tar.gz
tar xzf /tmp/cmake-3.26.4-linux-x86_64.tar.gz -C /tmp
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/cuda-12.1/compat/lib.real
mkdir build && cd build
/tmp/cmake-3.26.4-linux-x86_64/bin/cmake ..
make -j
workingDirectory: '$(System.DefaultWorkingDirectory)'
- task: Bash@3
name: UnitTests
displayName: Run mscclpp unit tests
inputs:
targetType: 'inline'
script: |
./build/test/unit_tests
workingDirectory: '$(System.DefaultWorkingDirectory)'
- task: Bash@3
name: MpUnitTests
displayName: Run mscclpp multi-process unit tests
inputs:
targetType: 'inline'
script: |
mpirun -tag-output -np 2 ./build/test/mp_unit_tests
mpirun -tag-output -np 4 ./build/test/mp_unit_tests
mpirun -tag-output -np 8 ./build/test/mp_unit_tests
workingDirectory: '$(System.DefaultWorkingDirectory)'

View File

@@ -23,7 +23,7 @@ jobs:
- name: Run cpplint
run: |
CPPSOURCES=$(find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "./build/*" -not -path "./python/*" -not -path "./test/*")
CPPSOURCES=$(find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "./build/*" -not -path "./python/*")
PYTHONCPPSOURCES=$(find ./python/src/ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)')
clang-format-12 -style=file --verbose --Werror --dry-run ${CPPSOURCES}
clang-format-12 --dry-run ${PYTHONCPPSOURCES}
@@ -40,10 +40,11 @@ jobs:
- name: Check out Git repository
uses: actions/checkout@v3
- name: Install dependencies
- name: Download misspell
run: |
curl -L https://git.io/misspell | sudo bash -s -- -b /bin
curl -L https://github.com/client9/misspell/releases/download/v0.3.4/misspell_0.3.4_linux_64bit.tar.gz -o /tmp/misspell_0.3.4_linux_64bit.tar.gz
tar -xzf /tmp/misspell_0.3.4_linux_64bit.tar.gz -C .
- name: Check spelling
run: |
misspell -error .
./misspell -error .

View File

@@ -131,9 +131,7 @@ struct DeviceChannel {
uint64_t curFifoHead = fifo_.push(
ChannelTrigger(TriggerData | TriggerFlag | TriggerSync, dst, dstOffset, src, srcOffset, size, channelId_)
.value);
while (*(volatile uint64_t*)&fifo_.triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 &&
*(volatile uint64_t*)fifo_.tailReplica <= curFifoHead)
;
fifo_.sync(curFifoHead);
}
__forceinline__ __device__ void putWithSignalAndFlush(MemoryId dst, MemoryId src, uint64_t offset, uint64_t size) {
@@ -142,11 +140,7 @@ struct DeviceChannel {
__forceinline__ __device__ void flush() {
uint64_t curFifoHead = fifo_.push(ChannelTrigger(TriggerSync, 0, 0, 0, 0, 1, channelId_).value);
// we need to wait for two conditions to be met to ensure the CPU is done flushing. (1) wait for the tail
// to go pass by curFifoHead (this is safety net) and (2) wait for the work element value to change to 0.
while (*(volatile uint64_t*)&fifo_.triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 &&
*(volatile uint64_t*)fifo_.tailReplica <= curFifoHead)
;
fifo_.sync(curFifoHead);
}
__forceinline__ __device__ void wait() { epoch_.wait(); }

View File

@@ -4,6 +4,7 @@
#include <memory>
#include <mscclpp/core.hpp>
#include <mscclpp/cuda_utils.hpp>
#include <mscclpp/poll.hpp>
namespace mscclpp {
@@ -51,8 +52,7 @@ class DeviceEpoch : BaseEpoch<CudaDeleter> {
#ifdef __CUDACC__
__forceinline__ __device__ void wait() {
(*expectedInboundEpochId) += 1;
while (*(volatile uint64_t*)&(epochIds->inboundReplica) < (*expectedInboundEpochId))
;
POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(epochIds->inboundReplica) < (*expectedInboundEpochId), 1000000000);
}
__forceinline__ __device__ void epochIncrement() { *(volatile uint64_t*)&(epochIds->outbound) += 1; }

View File

@@ -12,6 +12,7 @@ enum class ErrorCode {
SystemError,
InternalError,
InvalidUsage,
Timeout,
};
std::string errorToString(enum ErrorCode error);

View File

@@ -1,18 +1,15 @@
#ifndef MSCCLPP_FIFO_HPP_
#define MSCCLPP_FIFO_HPP_
#include <stdint.h>
#include <cstdint>
#include <functional>
#include <memory>
#include <mscclpp/poll.hpp>
#define MSCCLPP_PROXY_FIFO_SIZE 128
namespace mscclpp {
// For every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER, a flush of the tail to device memory is triggered.
// As long as MSCCLPP_PROXY_FIFO_SIZE is large enough, having a stale tail is not a problem.
#define MSCCLPP_PROXY_FIFO_SIZE 128
#define MSCCLPP_PROXY_FIFO_FLUSH_COUNTER 4
struct alignas(16) ProxyTrigger {
uint64_t fst, snd;
};
@@ -34,14 +31,23 @@ struct DeviceProxyFifo {
#ifdef __CUDACC__
__forceinline__ __device__ uint64_t push(ProxyTrigger trigger) {
uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->head, 1);
while (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->tailReplica))
;
while (*(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0)
;
POLL_MAYBE_JAILBREAK(curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->tailReplica), 1000000000);
POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0, 1000000000);
ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]);
asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd));
return curFifoHead;
}
__forceinline__ __device__ void sync(uint64_t curFifoHead) {
// We need to wait for two conditions to be met to ensure the CPU is done flushing. (1) wait for the tail
// to go pass by curFifoHead (this is safety net) and (2) wait for the work element value to change to 0.
POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]) != 0 &&
*(volatile uint64_t*)(this->tailReplica) <= curFifoHead,
1000000000);
}
#endif // __CUDACC__
ProxyTrigger* triggers; // Allocate on host via cudaHostAlloc. This space is used for pushing the workelements

43
include/mscclpp/poll.hpp Normal file
View File

@@ -0,0 +1,43 @@
#ifndef MSCCLPP_POLL_HPP_
#define MSCCLPP_POLL_HPP_
#ifdef __CUDACC__
#ifndef NDEBUG
#include <stdio.h>
#define POLL_PRINT_ON_STUCK(__cond) \
do { \
printf("mscclpp: spin is stuck. condition: " #__cond "\n"); \
} while (0);
#else // NDEBUG
#define POLL_PRINT_ON_STUCK(__cond)
#endif // NDEBUG
// If a spin is stuck, escape from it and set status to 1.
#define POLL_MAYBE_JAILBREAK_ESCAPE(__cond, __max_spin_cnt, __status) \
do { \
uint64_t __spin_cnt = 0; \
__status = 0; \
while (__cond) { \
if (__spin_cnt++ == __max_spin_cnt) { \
POLL_PRINT_ON_STUCK(__cond); \
__status = 1; \
break; \
} \
} \
} while (0);
// If a spin is stuck, print a warning and keep spinning.
#define POLL_MAYBE_JAILBREAK(__cond, __max_spin_cnt) \
do { \
uint64_t __spin_cnt = 0; \
while (__cond) { \
if (__spin_cnt++ == __max_spin_cnt) { \
POLL_PRINT_ON_STUCK(__cond); \
} \
} \
} while (0);
#endif // __CUDACC__
#endif // MSCCLPP_POLL_HPP_

View File

@@ -1,55 +1,37 @@
#ifndef MSCCLPP_UTILS_HPP_
#define MSCCLPP_UTILS_HPP_
#include <unistd.h>
#include <chrono>
#include <cstdio>
#include <cstring>
#include <mscclpp/errors.hpp>
#include <string>
namespace mscclpp {
struct Timer {
std::chrono::steady_clock::time_point start;
std::chrono::steady_clock::time_point start_;
int timeout_;
Timer() { start = std::chrono::steady_clock::now(); }
Timer(int timeout = -1);
int64_t elapsed() {
auto end = std::chrono::steady_clock::now();
return std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
}
~Timer();
void reset() { start = std::chrono::steady_clock::now(); }
int64_t elapsed() const;
void print(const char* name) {
auto end = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
printf("%s: %ld us\n", name, elapsed);
}
void set(int timeout);
void reset();
void print(const std::string& name);
};
struct ScopedTimer {
Timer timer;
const char* name;
struct ScopedTimer : public Timer {
const std::string name_;
ScopedTimer(const char* name) : name(name) {}
ScopedTimer(const std::string& name);
~ScopedTimer() { timer.print(name); }
~ScopedTimer();
};
inline std::string getHostName(int maxlen, const char delim) {
std::string hostname(maxlen + 1, '\0');
if (gethostname(const_cast<char*>(hostname.data()), maxlen) != 0) {
std::strncpy(const_cast<char*>(hostname.data()), "unknown", maxlen);
throw Error("gethostname failed", ErrorCode::SystemError);
}
int i = 0;
while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++;
hostname[i] = '\0';
return hostname;
}
std::string getHostName(int maxlen, const char delim);
} // namespace mscclpp

View File

@@ -13,6 +13,8 @@ std::string errorToString(enum ErrorCode error) {
return "InternalError";
case ErrorCode::InvalidUsage:
return "InvalidUsage";
case ErrorCode::Timeout:
return "Timeout";
default:
return "UnknownError";
}

View File

@@ -49,9 +49,8 @@ MSCCLPP_API_CPP void HostProxyFifo::pop() {
}
MSCCLPP_API_CPP void HostProxyFifo::flushTail(bool sync) {
// Flush the tail to device memory. This is either triggered every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER to make sure
// that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush
// request.
// Flush the tail to device memory. This is either triggered every ProxyFlushPeriod to make sure that the fifo can
// make progress even if there is no request mscclppSync. However, mscclppSync type is for flush request.
MSCCLPP_CUDATHROW(cudaMemcpyAsync(pimpl->tailReplica.get(), &pimpl->hostTail, sizeof(uint64_t),
cudaMemcpyHostToDevice, pimpl->stream));
if (sync) {

View File

@@ -35,18 +35,6 @@ using TimePoint = std::chrono::steady_clock::time_point;
TimePoint getClock();
int64_t elapsedClock(TimePoint start, TimePoint end);
/* get any bytes of random data from /dev/urandom */
inline void getRandomData(void* buffer, size_t bytes) {
if (bytes > 0) {
const size_t one = 1UL;
FILE* fp = fopen("/dev/urandom", "r");
if (buffer == NULL || fp == NULL || fread(buffer, bytes, one, fp) != one) {
throw Error("Failed to read random data", ErrorCode::SystemError);
}
if (fp) fclose(fp);
}
}
} // namespace mscclpp
#endif

View File

@@ -10,6 +10,8 @@ namespace mscclpp {
const int ProxyStopCheckPeriod = 1000;
// Unless explicitly requested, a flush of the tail to device memory is triggered for every ProxyFlushPeriod.
// As long as MSCCLPP_PROXY_FIFO_SIZE is large enough, having a stale tail is not a problem.
const int ProxyFlushPeriod = 4;
struct Proxy::Impl {

66
src/utils.cc Normal file
View File

@@ -0,0 +1,66 @@
#include <signal.h>
#include <unistd.h>
#include <chrono>
#include <iostream>
#include <mscclpp/errors.hpp>
#include <mscclpp/utils.hpp>
#include <sstream>
#include <string>
// Throw upon SIGALRM.
static void sigalrmTimeoutHandler(int) {
signal(SIGALRM, SIG_IGN);
throw mscclpp::Error("Timer timed out", mscclpp::ErrorCode::Timeout);
}
namespace mscclpp {
Timer::Timer(int timeout) { set(timeout); }
Timer::~Timer() {
if (timeout_ > 0) {
alarm(0);
signal(SIGALRM, SIG_DFL);
}
}
int64_t Timer::elapsed() const {
auto end = std::chrono::steady_clock::now();
return std::chrono::duration_cast<std::chrono::microseconds>(end - start_).count();
}
void Timer::set(int timeout) {
timeout_ = timeout;
if (timeout > 0) {
signal(SIGALRM, sigalrmTimeoutHandler);
alarm(timeout);
}
start_ = std::chrono::steady_clock::now();
}
void Timer::reset() { set(timeout_); }
void Timer::print(const std::string& name) {
auto us = elapsed();
std::stringstream ss;
ss << name << ": " << us << " us\n";
std::cout << ss.str();
}
ScopedTimer::ScopedTimer(const std::string& name) : name_(name) {}
ScopedTimer::~ScopedTimer() { print(name_); }
std::string getHostName(int maxlen, const char delim) {
std::string hostname(maxlen + 1, '\0');
if (gethostname(const_cast<char*>(hostname.data()), maxlen) != 0) {
throw Error("gethostname failed", ErrorCode::SystemError);
}
int i = 0;
while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++;
hostname[i] = '\0';
return hostname;
}
} // namespace mscclpp

View File

@@ -171,4 +171,17 @@ TimePoint getClock() { return std::chrono::steady_clock::now(); }
int64_t elapsedClock(TimePoint start, TimePoint end) {
return std::chrono::duration_cast<std::chrono::seconds>(end - start).count();
}
/* get any bytes of random data from /dev/urandom */
void getRandomData(void* buffer, size_t bytes) {
if (bytes > 0) {
const size_t one = 1UL;
FILE* fp = fopen("/dev/urandom", "r");
if (buffer == NULL || fp == NULL || fread(buffer, bytes, one, fp) != one) {
throw Error("Failed to read random data", ErrorCode::SystemError);
}
if (fp) fclose(fp);
}
}
} // namespace mscclpp

View File

@@ -9,17 +9,19 @@ function(add_test_executable name sources)
endif()
endfunction()
add_test_executable(bootstrap_test_cpp bootstrap_test_cpp.cc)
add_test_executable(communicator_test_cpp communicator_test_cpp.cu)
add_test_executable(allgather_test_cpp allgather_test_cpp.cu)
add_test_executable(allgather_test_host_offloading allgather_test_host_offloading.cu)
add_test_executable(ib_test ib_test.cc)
add_executable(mp_unit_tests mp_unit_tests.cu)
target_link_libraries(mp_unit_tests mscclpp CUDA::cudart CUDA::cuda_driver MPI::MPI_CXX GTest::gtest_main GTest::gmock_main)
target_include_directories(mp_unit_tests PRIVATE ${PROJECT_SOURCE_DIR}/src/include)
configure_file(run_mpi_test.sh.in run_mpi_test.sh)
# Unit tests
add_executable(unit_tests)
target_link_libraries(unit_tests GTest::gtest_main GTest::gmock_main mscclpp CUDA::cudart CUDA::cuda_driver)
target_include_directories(unit_tests PRIVATE ${PROJECT_SOURCE_DIR}/src/include)
add_subdirectory(unit) # This adds the sources to the mscclpp target
gtest_discover_tests(unit_tests DISCOVERY_MODE PRE_TEST)

View File

@@ -1,122 +0,0 @@
#include <mpi.h>
#include <cassert>
#include <iostream>
#include <memory>
#include <mscclpp/core.hpp>
void test_allgather(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
std::vector<int> tmp(bootstrap->getNranks(), 0);
tmp[bootstrap->getRank()] = bootstrap->getRank() + 1;
bootstrap->allGather(tmp.data(), sizeof(int));
for (int i = 0; i < bootstrap->getNranks(); i++) {
assert(tmp[i] == i + 1);
}
if (bootstrap->getRank() == 0) std::cout << "AllGather test passed!" << std::endl;
}
void test_barrier(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
bootstrap->barrier();
if (bootstrap->getRank() == 0) std::cout << "Barrier test passed!" << std::endl;
}
void test_sendrecv(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
for (int i = 0; i < bootstrap->getNranks(); i++) {
if (bootstrap->getRank() == i) continue;
int msg1 = (bootstrap->getRank() + 1) * 3;
int msg2 = (bootstrap->getRank() + 1) * 3 + 1;
int msg3 = (bootstrap->getRank() + 1) * 3 + 2;
bootstrap->send(&msg1, sizeof(int), i, 0);
bootstrap->send(&msg2, sizeof(int), i, 1);
bootstrap->send(&msg3, sizeof(int), i, 2);
}
for (int i = 0; i < bootstrap->getNranks(); i++) {
if (bootstrap->getRank() == i) continue;
int msg1 = 0;
int msg2 = 0;
int msg3 = 0;
// recv them in the opposite order to check correctness
bootstrap->recv(&msg2, sizeof(int), i, 1);
bootstrap->recv(&msg3, sizeof(int), i, 2);
bootstrap->recv(&msg1, sizeof(int), i, 0);
assert(msg1 == (i + 1) * 3);
assert(msg2 == (i + 1) * 3 + 1);
assert(msg3 == (i + 1) * 3 + 2);
}
if (bootstrap->getRank() == 0) std::cout << "Send/Recv test passed!" << std::endl;
}
void test_all(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
test_allgather(bootstrap);
test_barrier(bootstrap);
test_sendrecv(bootstrap);
}
void test_mscclpp_bootstrap_with_id(int rank, int worldSize) {
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(rank, worldSize);
mscclpp::UniqueId id;
if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId();
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
bootstrap->initialize(id);
test_all(bootstrap);
if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Bootstrap test with unique id passed! ---" << std::endl;
}
void test_mscclpp_bootstrap_with_ip_port_pair(int rank, int worldSize, char* ipPortPair) {
std::shared_ptr<mscclpp::Bootstrap> bootstrap(new mscclpp::Bootstrap(rank, worldSize));
bootstrap->initialize(ipPortPair);
test_all(bootstrap);
if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Bootstrap test with ip_port pair passed! ---" << std::endl;
}
class MPIBootstrap : public mscclpp::BaseBootstrap {
public:
MPIBootstrap() : BaseBootstrap() {}
int getRank() override {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
int getNranks() override {
int worldSize;
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
return worldSize;
}
void allGather(void* sendbuf, int size) override {
MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sendbuf, size, MPI_BYTE, MPI_COMM_WORLD);
}
void barrier() override { MPI_Barrier(MPI_COMM_WORLD); }
void send(void* sendbuf, int size, int dest, int tag) override {
MPI_Send(sendbuf, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
}
void recv(void* recvbuf, int size, int source, int tag) override {
MPI_Recv(recvbuf, size, MPI_BYTE, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
};
void test_mpi_bootstrap() {
std::shared_ptr<mscclpp::BaseBootstrap> bootstrap(new MPIBootstrap());
test_all(bootstrap);
if (bootstrap->getRank() == 0) std::cout << "--- MPI Bootstrap test passed! ---" << std::endl;
}
int main(int argc, char** argv) {
int rank, worldSize;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
if (argc > 2) {
if (rank == 0) std::cout << "Usage: " << argv[0] << " [ip:port]" << std::endl;
MPI_Finalize();
return 0;
}
test_mscclpp_bootstrap_with_id(rank, worldSize);
if (argc == 2) test_mscclpp_bootstrap_with_ip_port_pair(rank, worldSize, argv[1]);
test_mpi_bootstrap();
MPI_Finalize();
return 0;
}

View File

@@ -1,336 +0,0 @@
#include <cuda_runtime.h>
#include <mpi.h>
#include <cassert>
#include <iostream>
#include <memory>
#include <mscclpp/core.hpp>
#include <mscclpp/epoch.hpp>
#include <unordered_map>
#define CUDATHROW(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
throw std::runtime_error(std::string("Cuda failure '") + cudaGetErrorString(err) + "'"); \
} \
} while (false)
mscclpp::Transport findIb(int localRank) {
mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2,
mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5,
mscclpp::Transport::IB6, mscclpp::Transport::IB7};
return IBs[localRank];
}
void register_all_memories(mscclpp::Communicator& communicator, int rank, int worldSize, void* devicePtr,
size_t deviceBufferSize, mscclpp::Transport myIbDevice,
mscclpp::RegisteredMemory& localMemory,
std::unordered_map<int, mscclpp::RegisteredMemory>& remoteMemory) {
localMemory = communicator.registerMemory(devicePtr, deviceBufferSize, mscclpp::Transport::CudaIpc | myIbDevice);
std::unordered_map<int, mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> futureRemoteMemory;
for (int i = 0; i < worldSize; i++) {
if (i != rank) {
communicator.sendMemoryOnSetup(localMemory, i, 0);
futureRemoteMemory[i] = communicator.recvMemoryOnSetup(i, 0);
}
}
communicator.setup();
for (int i = 0; i < worldSize; i++) {
if (i != rank) {
remoteMemory[i] = futureRemoteMemory[i].get();
}
}
}
void make_connections(mscclpp::Communicator& communicator, int rank, int worldSize, int nRanksPerNode,
mscclpp::Transport myIbDevice,
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections) {
for (int i = 0; i < worldSize; i++) {
if (i != rank) {
if (i / nRanksPerNode == rank / nRanksPerNode) {
connections[i] = communicator.connectOnSetup(i, 0, mscclpp::Transport::CudaIpc);
} else {
connections[i] = communicator.connectOnSetup(i, 0, myIbDevice);
}
}
}
communicator.setup();
}
void write_remote(int rank, int worldSize, std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections,
std::unordered_map<int, mscclpp::RegisteredMemory>& remoteRegisteredMemories,
mscclpp::RegisteredMemory& registeredMemory, int dataCountPerRank) {
for (int i = 0; i < worldSize; i++) {
if (i != rank) {
auto& conn = connections.at(i);
auto& peerMemory = remoteRegisteredMemories.at(i);
conn->write(peerMemory, rank * dataCountPerRank * sizeof(int), registeredMemory,
rank * dataCountPerRank * sizeof(int), dataCountPerRank * sizeof(int));
conn->flush();
}
}
}
void device_buffer_init(int rank, int worldSize, int dataCount, std::vector<int*>& devicePtr) {
for (int n = 0; n < (int)devicePtr.size(); n++) {
std::vector<int> hostBuffer(dataCount, 0);
for (int i = 0; i < dataCount; i++) {
hostBuffer[i] = rank + n * worldSize;
}
CUDATHROW(cudaMemcpy(devicePtr[n], hostBuffer.data(), dataCount * sizeof(int), cudaMemcpyHostToDevice));
}
CUDATHROW(cudaDeviceSynchronize());
}
bool test_device_buffer_write_correctness(int rank, int worldSize, int nRanksPerNode, int dataCount,
std::vector<int*>& devicePtr, bool skipLocal = false) {
for (int n = 0; n < (int)devicePtr.size(); n++) {
std::vector<int> hostBuffer(dataCount, 0);
CUDATHROW(cudaMemcpy(hostBuffer.data(), devicePtr[n], dataCount * sizeof(int), cudaMemcpyDeviceToHost));
for (int i = 0; i < worldSize; i++) {
if (i / nRanksPerNode == rank / nRanksPerNode && skipLocal) {
continue;
}
for (int j = i * dataCount / worldSize; j < (i + 1) * dataCount / worldSize; j++) {
if (hostBuffer[j] != i + n * worldSize) {
return false;
}
}
}
}
return true;
}
void test_write(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize,
std::shared_ptr<mscclpp::BaseBootstrap> bootstrap,
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections,
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>>& remoteMemory,
std::vector<mscclpp::RegisteredMemory>& localMemory, std::vector<int*>& devicePtr, int numBuffers) {
assert((deviceBufferSize / sizeof(int)) % worldSize == 0);
size_t dataCount = deviceBufferSize / sizeof(int);
device_buffer_init(rank, worldSize, dataCount, devicePtr);
bootstrap->barrier();
if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl;
for (int n = 0; n < numBuffers; n++) {
write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize);
}
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "RDMA write for " << std::to_string(numBuffers) << " buffers passed" << std::endl;
// polling until it becomes ready
bool ready = false;
int niter = 0;
do {
ready = test_device_buffer_write_correctness(rank, worldSize, nRanksPerNode, dataCount, devicePtr);
niter++;
if (niter == 10000) {
throw std::runtime_error("Polling is stuck.");
}
} while (!ready);
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "Polling for " << std::to_string(numBuffers) << " buffers passed" << std::endl;
if (bootstrap->getRank() == 0) std::cout << "--- Testing vanialla writes passed ---" << std::endl;
}
__global__ void increament_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) {
int tid = threadIdx.x;
if (tid != rank && tid < worldSize) {
deviceEpochs[tid].epochIncrement();
}
}
__global__ void wait_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) {
int tid = threadIdx.x;
if (tid != rank && tid < worldSize) {
deviceEpochs[tid].wait();
}
}
void test_write_with_device_epochs(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize,
mscclpp::Communicator& communicator,
std::shared_ptr<mscclpp::BaseBootstrap> bootstrap,
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections,
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>>& remoteMemory,
std::vector<mscclpp::RegisteredMemory>& localMemory, std::vector<int*>& devicePtr,
int numBuffers) {
std::unordered_map<int, std::shared_ptr<mscclpp::DeviceEpoch>> epochs;
for (auto entry : connections) {
auto& conn = entry.second;
epochs.insert({entry.first, std::make_shared<mscclpp::DeviceEpoch>(communicator, conn)});
}
communicator.setup();
bootstrap->barrier();
if (bootstrap->getRank() == 0) std::cout << "Epochs are created" << std::endl;
assert((deviceBufferSize / sizeof(int)) % worldSize == 0);
size_t dataCount = deviceBufferSize / sizeof(int);
device_buffer_init(rank, worldSize, dataCount, devicePtr);
bootstrap->barrier();
if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl;
mscclpp::DeviceEpoch::DeviceHandle* deviceEpochHandles;
CUDATHROW(cudaMalloc(&deviceEpochHandles, sizeof(mscclpp::DeviceEpoch::DeviceHandle) * worldSize));
for (int i = 0; i < worldSize; i++) {
if (i != rank) {
mscclpp::DeviceEpoch::DeviceHandle deviceHandle = epochs[i]->deviceHandle();
CUDATHROW(cudaMemcpy(&deviceEpochHandles[i], &deviceHandle, sizeof(mscclpp::DeviceEpoch::DeviceHandle),
cudaMemcpyHostToDevice));
}
}
CUDATHROW(cudaDeviceSynchronize());
bootstrap->barrier();
if (bootstrap->getRank() == 0) std::cout << "CUDA device epochs are created" << std::endl;
for (int n = 0; n < numBuffers; n++) {
write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize);
}
increament_epochs<<<1, worldSize>>>(deviceEpochHandles, rank, worldSize);
CUDATHROW(cudaDeviceSynchronize());
for (int i = 0; i < worldSize; i++) {
if (i != rank) {
epochs[i]->signal();
}
}
wait_epochs<<<1, worldSize>>>(deviceEpochHandles, rank, worldSize);
CUDATHROW(cudaDeviceSynchronize());
if (!test_device_buffer_write_correctness(rank, worldSize, nRanksPerNode, dataCount, devicePtr)) {
throw std::runtime_error("unexpected result.");
}
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "--- Testing writes with device epochs for " << std::to_string(numBuffers) << " buffers passed ---"
<< std::endl;
}
void test_write_with_host_epochs(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize,
mscclpp::Communicator& communicator, std::shared_ptr<mscclpp::BaseBootstrap> bootstrap,
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections,
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>>& remoteMemory,
std::vector<mscclpp::RegisteredMemory>& localMemory, std::vector<int*>& devicePtr,
int numBuffers) {
std::unordered_map<int, std::shared_ptr<mscclpp::HostEpoch>> epochs;
for (auto entry : connections) {
auto& conn = entry.second;
if (conn->transport() == mscclpp::Transport::CudaIpc) continue;
epochs.insert({entry.first, std::make_shared<mscclpp::HostEpoch>(communicator, conn)});
}
communicator.setup();
bootstrap->barrier();
if (bootstrap->getRank() == 0) std::cout << "Epochs are created" << std::endl;
assert((deviceBufferSize / sizeof(int)) % worldSize == 0);
size_t dataCount = deviceBufferSize / sizeof(int);
device_buffer_init(rank, worldSize, dataCount, devicePtr);
bootstrap->barrier();
if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl;
bootstrap->barrier();
if (bootstrap->getRank() == 0) std::cout << "Host epochs are created" << std::endl;
for (int n = 0; n < numBuffers; n++) {
write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize);
}
for (int i = 0; i < worldSize; i++) {
if (i != rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) {
epochs[i]->incrementAndSignal();
}
}
for (int i = 0; i < worldSize; i++) {
if (i != rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) {
epochs[i]->wait();
}
}
if (!test_device_buffer_write_correctness(rank, worldSize, nRanksPerNode, dataCount, devicePtr, true)) {
throw std::runtime_error("unexpected result.");
}
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "--- Testing writes with host epochs for " << std::to_string(numBuffers) << " buffers passed ---"
<< std::endl;
}
void test_communicator(int rank, int worldSize, int nRanksPerNode) {
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(rank, worldSize);
mscclpp::UniqueId id;
if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId();
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
bootstrap->initialize(id);
mscclpp::Communicator communicator(bootstrap);
if (bootstrap->getRank() == 0) std::cout << "Communicator initialization passed" << std::endl;
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>> connections;
auto myIbDevice = findIb(rank % nRanksPerNode);
make_connections(communicator, rank, worldSize, nRanksPerNode, myIbDevice, connections);
if (bootstrap->getRank() == 0) std::cout << "Connection setup passed" << std::endl;
int numBuffers = 10;
std::vector<int*> devicePtr(numBuffers);
int deviceBufferSize = 1024 * 1024;
std::vector<mscclpp::RegisteredMemory> localMemory(numBuffers);
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>> remoteMemory(numBuffers);
for (int n = 0; n < numBuffers; n++) {
if (n % 100 == 0) std::cout << "Registering memory for " << std::to_string(n) << " buffers" << std::endl;
CUDATHROW(cudaMalloc(&devicePtr[n], deviceBufferSize));
register_all_memories(communicator, rank, worldSize, devicePtr[n], deviceBufferSize, myIbDevice, localMemory[n],
remoteMemory[n]);
}
bootstrap->barrier();
if (bootstrap->getRank() == 0)
std::cout << "Memory registration for " << std::to_string(numBuffers) << " buffers passed" << std::endl;
test_write(rank, worldSize, nRanksPerNode, deviceBufferSize, bootstrap, connections, remoteMemory, localMemory,
devicePtr, numBuffers);
test_write_with_device_epochs(rank, worldSize, nRanksPerNode, deviceBufferSize, communicator, bootstrap, connections,
remoteMemory, localMemory, devicePtr, numBuffers);
test_write_with_host_epochs(rank, worldSize, nRanksPerNode, deviceBufferSize, communicator, bootstrap, connections,
remoteMemory, localMemory, devicePtr, numBuffers);
if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Communicator tests passed! ---" << std::endl;
for (int n = 0; n < numBuffers; n++) {
CUDATHROW(cudaFree(devicePtr[n]));
}
}
int main(int argc, char** argv) {
int rank, worldSize;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
MPI_Comm shmcomm;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm);
int shmWorldSize;
MPI_Comm_size(shmcomm, &shmWorldSize);
int nRanksPerNode = shmWorldSize;
MPI_Comm_free(&shmcomm);
test_communicator(rank, worldSize, nRanksPerNode);
MPI_Finalize();
return 0;
}

View File

@@ -1,98 +0,0 @@
#include "ib.hpp"
#include <array>
#include <mscclpp/core.hpp>
#include <mscclpp/cuda_utils.hpp>
#include <string>
#include "checks_internal.hpp"
#include "infiniband/verbs.h"
// Measure current time in second.
static double getTime(void) {
struct timespec tspec;
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
printf("clock_gettime failed\n");
exit(EXIT_FAILURE);
}
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
}
// Example usage:
// Receiver: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 0 0 0
// Sender: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 1 0 0
int main(int argc, const char* argv[]) {
if (argc != 5) {
printf("Usage: %s <ip:port> <0(recv)/1(send)> <gpu id> <ib id>\n", argv[0]);
return 1;
}
const char* ipPortPair = argv[1];
int isSend = atoi(argv[2]);
int cudaDevId = atoi(argv[3]);
std::string ibDevName = "mlx5_ib" + std::string(argv[4]);
MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevId));
int nelem = 1;
auto data = mscclpp::allocUniqueCuda<int>(nelem);
std::shared_ptr<mscclpp::Bootstrap> bootstrap(new mscclpp::Bootstrap(isSend, 2));
bootstrap->initialize(ipPortPair);
mscclpp::IbCtx ctx(ibDevName);
mscclpp::IbQp* qp = ctx.createQp();
const mscclpp::IbMr* mr = ctx.registerMr(data.get(), sizeof(int) * nelem);
std::array<mscclpp::IbQpInfo, 2> qpInfo;
qpInfo[isSend] = qp->getInfo();
std::array<mscclpp::IbMrInfo, 2> mrInfo;
mrInfo[isSend] = mr->getInfo();
bootstrap->allGather(qpInfo.data(), sizeof(mscclpp::IbQpInfo));
bootstrap->allGather(mrInfo.data(), sizeof(mscclpp::IbMrInfo));
for (int i = 0; i < bootstrap->getNranks(); ++i) {
if (i == isSend) continue;
qp->rtr(qpInfo[i]);
qp->rts();
break;
}
printf("connection succeed\n");
bootstrap->barrier();
if (isSend) {
int maxIter = 100000;
double start = getTime();
for (int iter = 0; iter < maxIter; ++iter) {
qp->stageSend(mr, mrInfo[0], sizeof(int) * nelem, 0, 0, 0, true);
qp->postSend();
bool waiting = true;
while (waiting) {
int wcNum = qp->pollCq();
if (wcNum < 0) {
WARN("pollCq failed: errno %d", errno);
return 1;
}
for (int i = 0; i < wcNum; ++i) {
const ibv_wc* wc = qp->getWc(i);
if (wc->status != IBV_WC_SUCCESS) {
WARN("wc status %d", wc->status);
return 1;
}
waiting = false;
break;
}
}
}
// TODO(chhwang): print detailed stats such as avg, 99%p, etc.
printf("%f us/iter\n", (getTime() - start) / maxIter * 1e6);
}
// A simple barrier
bootstrap->barrier();
return 0;
}

765
test/mp_unit_tests.cu Normal file
View File

@@ -0,0 +1,765 @@
#include <gtest/gtest.h>
#include <mpi.h>
#include <iostream>
#include <mscclpp/channel.hpp>
#include <mscclpp/core.hpp>
#include <mscclpp/cuda_utils.hpp>
#include <mscclpp/epoch.hpp>
#include <mscclpp/utils.hpp>
#include <sstream>
#include "config.hpp"
#include "ib.hpp"
#include "infiniband/verbs.h"
static const char gDefaultIpPort[] = "127.0.0.1:50053";
class MultiProcessTestEnv : public ::testing::Environment {
public:
MultiProcessTestEnv(int argc, const char** argv) : argc(argc), argv(argv) {}
// Override this to define how to set up the environment.
void SetUp() {
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
// get the local number of nodes with MPI
MPI_Comm shmcomm;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm);
int shmrank;
MPI_Comm_size(shmcomm, &shmrank);
nRanksPerNode = shmrank;
MPI_Comm_free(&shmcomm);
// parse the command line arguments
args = parseArgs(argc, argv);
}
// Override this to define how to tear down the environment.
void TearDown() { MPI_Finalize(); }
static std::unordered_map<std::string, std::string> parseArgs(int argc, const char* argv[]) {
auto printUsage = [](const char* prog) {
std::stringstream ss;
ss << "Usage: " << prog << " [-ip_port IP:PORT]\n";
std::cout << ss.str();
};
std::unordered_map<std::string, std::string> options;
// Default values
options["ip_port"] = gDefaultIpPort;
// Parse the command line arguments
for (int i = 1; i < argc; i++) {
std::string arg = argv[i];
if (arg == "-ip_port") {
if (i + 1 < argc) {
options["ip_port"] = argv[++i];
} else {
throw std::invalid_argument("Error: -ip_port option requires an argument.\n");
}
} else if (arg == "-help" || arg == "-h") {
printUsage(argv[0]);
exit(0);
} else {
throw std::invalid_argument("Error: Unknown option " + std::string(argv[i]) + "\n");
}
}
return options;
}
const int argc;
const char** argv;
int rank;
int worldSize;
int nRanksPerNode;
std::unordered_map<std::string, std::string> args;
};
MultiProcessTestEnv* gEnv = nullptr;
class MultiProcessTest : public ::testing::Test {};
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
gEnv = new MultiProcessTestEnv(argc, (const char**)argv);
::testing::AddGlobalTestEnvironment(gEnv);
return RUN_ALL_TESTS();
}
TEST_F(MultiProcessTest, Prelim) {
// Test to make sure the MPI environment is set up correctly
ASSERT_GE(gEnv->worldSize, 2);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Bootstrap tests
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class BootstrapTest : public MultiProcessTest {
protected:
// Each test case should finish within 3 seconds.
mscclpp::Timer bootstrapTestTimer{3};
};
void bootstrapTestAllGather(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
std::vector<int> tmp(bootstrap->getNranks(), 0);
tmp[bootstrap->getRank()] = bootstrap->getRank() + 1;
bootstrap->allGather(tmp.data(), sizeof(int));
for (int i = 0; i < bootstrap->getNranks(); ++i) {
EXPECT_EQ(tmp[i], i + 1);
}
}
void bootstrapTestBarrier(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) { bootstrap->barrier(); }
void bootstrapTestSendRecv(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
for (int i = 0; i < bootstrap->getNranks(); i++) {
if (bootstrap->getRank() == i) continue;
int msg1 = (bootstrap->getRank() + 1) * 3;
int msg2 = (bootstrap->getRank() + 1) * 3 + 1;
int msg3 = (bootstrap->getRank() + 1) * 3 + 2;
bootstrap->send(&msg1, sizeof(int), i, 0);
bootstrap->send(&msg2, sizeof(int), i, 1);
bootstrap->send(&msg3, sizeof(int), i, 2);
}
for (int i = 0; i < bootstrap->getNranks(); i++) {
if (bootstrap->getRank() == i) continue;
int msg1 = 0;
int msg2 = 0;
int msg3 = 0;
// recv them in the opposite order to check correctness
bootstrap->recv(&msg2, sizeof(int), i, 1);
bootstrap->recv(&msg3, sizeof(int), i, 2);
bootstrap->recv(&msg1, sizeof(int), i, 0);
EXPECT_EQ(msg1, (i + 1) * 3);
EXPECT_EQ(msg2, (i + 1) * 3 + 1);
EXPECT_EQ(msg3, (i + 1) * 3 + 2);
}
}
void bootstrapTestAll(std::shared_ptr<mscclpp::BaseBootstrap> bootstrap) {
bootstrapTestAllGather(bootstrap);
bootstrapTestBarrier(bootstrap);
bootstrapTestSendRecv(bootstrap);
}
TEST_F(BootstrapTest, WithId) {
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(gEnv->rank, gEnv->worldSize);
mscclpp::UniqueId id;
if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId();
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
bootstrap->initialize(id);
bootstrapTestAll(bootstrap);
}
TEST_F(BootstrapTest, WithIpPortPair) {
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(gEnv->rank, gEnv->worldSize);
bootstrap->initialize(gEnv->args["ip_port"]);
bootstrapTestAll(bootstrap);
}
TEST_F(BootstrapTest, ResumeWithId) {
for (int i = 0; i < 5; ++i) {
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(gEnv->rank, gEnv->worldSize);
mscclpp::UniqueId id;
if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId();
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
bootstrap->initialize(id);
}
}
TEST_F(BootstrapTest, ResumeWithIpPortPair) {
// TODO: enable when the bug is fixed. bootstrap hangs and even timer doesn't work
#if 0
for (int i = 0; i < 5; ++i) {
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(gEnv->rank, gEnv->worldSize);
bootstrap->initialize(gEnv->args["ip_port"]);
}
#else
// TODO: remove when the bug is fixed.
FAIL();
#endif
}
TEST_F(BootstrapTest, ExitBeforeConnect) {
// TODO: enable when the bug is fixed. bootstrap rootThread_ does not exit gracefully
#if 0
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(gEnv->rank, gEnv->worldSize);
mscclpp::UniqueId id = bootstrap->createUniqueId();
#else
// TODO: remove when the bug is fixed.
FAIL();
#endif
}
TEST_F(BootstrapTest, TimeoutWithId) {
// TODO: enable when BootstrapTest.ExitBeforeConnect passes.
#if 0
// Set bootstrap timeout to 1 second
mscclpp::Config* cfg = mscclpp::Config::getInstance();
cfg->setBootstrapConnectionTimeoutConfig(1);
// All ranks initialize a bootstrap with their own id (will hang)
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(gEnv->rank, gEnv->worldSize);
mscclpp::UniqueId id = bootstrap->createUniqueId();
ASSERT_THROW(bootstrap->initialize(id), mscclpp::Error);
// Timeout should be less than 3 seconds
ASSERT_LT(timer.elapsed(), 3000000);
#else
// TODO: remove when BootstrapTest.ExitBeforeConnect passes.
FAIL();
#endif
}
class MPIBootstrap : public mscclpp::BaseBootstrap {
public:
MPIBootstrap() : BaseBootstrap() {}
int getRank() override {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
int getNranks() override {
int worldSize;
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
return worldSize;
}
void allGather(void* sendbuf, int size) override {
MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sendbuf, size, MPI_BYTE, MPI_COMM_WORLD);
}
void barrier() override { MPI_Barrier(MPI_COMM_WORLD); }
void send(void* sendbuf, int size, int dest, int tag) override {
MPI_Send(sendbuf, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
}
void recv(void* recvbuf, int size, int source, int tag) override {
MPI_Recv(recvbuf, size, MPI_BYTE, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
};
TEST_F(BootstrapTest, MPIBootstrap) {
auto bootstrap = std::make_shared<MPIBootstrap>();
bootstrapTestAll(bootstrap);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// InfiniBand tests
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
static mscclpp::Transport ibIdToTransport(int id) {
mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2,
mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5,
mscclpp::Transport::IB6, mscclpp::Transport::IB7};
return IBs[id];
}
class IbTest : public MultiProcessTest {
protected:
void SetUp() override {
MSCCLPP_CUDATHROW(cudaGetDeviceCount(&cudaDevNum));
cudaDevId = (gEnv->rank % gEnv->nRanksPerNode) % cudaDevNum;
MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevId));
int ibDevId = (gEnv->rank % gEnv->nRanksPerNode) / mscclpp::getIBDeviceCount();
ibDevName = mscclpp::getIBDeviceName(ibIdToTransport(ibDevId));
}
int cudaDevNum;
int cudaDevId;
std::string ibDevName;
};
TEST_F(IbTest, SimpleSendRecv) {
if (gEnv->rank >= 2) {
// This test needs only two ranks
return;
}
mscclpp::Timer timer(3);
const int maxIter = 100000;
const int nelem = 1;
auto data = mscclpp::allocUniqueCuda<int>(nelem);
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(gEnv->rank, 2);
mscclpp::UniqueId id;
if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId();
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
bootstrap->initialize(id);
mscclpp::IbCtx ctx(ibDevName);
mscclpp::IbQp* qp = ctx.createQp();
const mscclpp::IbMr* mr = ctx.registerMr(data.get(), sizeof(int) * nelem);
std::array<mscclpp::IbQpInfo, 2> qpInfo;
qpInfo[gEnv->rank] = qp->getInfo();
std::array<mscclpp::IbMrInfo, 2> mrInfo;
mrInfo[gEnv->rank] = mr->getInfo();
bootstrap->allGather(qpInfo.data(), sizeof(mscclpp::IbQpInfo));
bootstrap->allGather(mrInfo.data(), sizeof(mscclpp::IbMrInfo));
for (int i = 0; i < bootstrap->getNranks(); ++i) {
if (i == gEnv->rank) continue;
qp->rtr(qpInfo[i]);
qp->rts();
break;
}
bootstrap->barrier();
if (gEnv->rank == 1) {
mscclpp::Timer timer;
for (int iter = 0; iter < maxIter; ++iter) {
qp->stageSend(mr, mrInfo[0], sizeof(int) * nelem, 0, 0, 0, true);
qp->postSend();
bool waiting = true;
int spin = 0;
while (waiting) {
int wcNum = qp->pollCq();
ASSERT_GE(wcNum, 0);
for (int i = 0; i < wcNum; ++i) {
const ibv_wc* wc = qp->getWc(i);
EXPECT_EQ(wc->status, IBV_WC_SUCCESS);
waiting = false;
break;
}
if (spin++ > 1000000) {
FAIL() << "Polling is stuck.";
}
}
}
float us = (float)timer.elapsed();
std::cout << "IbTest.SimpleSendRecv: " << us / maxIter << " us/iter" << std::endl;
}
bootstrap->barrier();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Communicator tests
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CommunicatorTestBase : public MultiProcessTest {
protected:
void SetUp() override {
MultiProcessTest::SetUp();
if (numRanksToUse == -1) {
numRanksToUse = gEnv->worldSize;
}
ASSERT_LE(numRanksToUse, gEnv->worldSize);
std::shared_ptr<mscclpp::Bootstrap> bootstrap;
mscclpp::UniqueId id;
if (gEnv->rank < numRanksToUse) {
bootstrap = std::make_shared<mscclpp::Bootstrap>(gEnv->rank, numRanksToUse);
if (gEnv->rank == 0) id = bootstrap->createUniqueId();
}
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
if (gEnv->rank >= numRanksToUse) {
return;
}
bootstrap->initialize(id);
communicator = std::make_shared<mscclpp::Communicator>(bootstrap);
ibTransport = ibIdToTransport(rankToLocalRank(gEnv->rank));
}
void TearDown() override {
connections.clear();
communicator.reset();
MultiProcessTest::TearDown();
}
void setNumRanksToUse(int num) { numRanksToUse = num; }
int rankToLocalRank(int rank) const { return rank % gEnv->nRanksPerNode; }
int rankToNode(int rank) const { return rank / gEnv->nRanksPerNode; }
void connectMesh(bool useIbOnly = false) {
for (int i = 0; i < numRanksToUse; i++) {
if (i != gEnv->rank) {
if ((rankToNode(i) == rankToNode(gEnv->rank)) && !useIbOnly) {
connections[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::CudaIpc);
} else {
connections[i] = communicator->connectOnSetup(i, 0, ibTransport);
}
}
}
communicator->setup();
}
// Register a local memory and receive corresponding remote memories
void registerMemoryPairs(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag,
const std::vector<int>& remoteRanks, mscclpp::RegisteredMemory& localMemory,
std::unordered_map<int, mscclpp::RegisteredMemory>& remoteMemories) {
localMemory = communicator->registerMemory(buff, buffSize, transport);
std::unordered_map<int, mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> futureRemoteMemories;
for (int remoteRank : remoteRanks) {
if (remoteRank != communicator->bootstrapper()->getRank()) {
communicator->sendMemoryOnSetup(localMemory, remoteRank, tag);
futureRemoteMemories[remoteRank] = communicator->recvMemoryOnSetup(remoteRank, tag);
}
}
communicator->setup();
for (int remoteRank : remoteRanks) {
if (remoteRank != communicator->bootstrapper()->getRank()) {
remoteMemories[remoteRank] = futureRemoteMemories[remoteRank].get();
}
}
}
// Register a local memory an receive one corresponding remote memory
void registerMemoryPair(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, int remoteRank,
mscclpp::RegisteredMemory& localMemory, mscclpp::RegisteredMemory& remoteMemory) {
std::vector<int> remoteRanks = {remoteRank};
std::unordered_map<int, mscclpp::RegisteredMemory> remoteMemories;
registerMemoryPairs(buff, buffSize, transport, tag, remoteRanks, localMemory, remoteMemories);
remoteMemory = remoteMemories[remoteRank];
}
int numRanksToUse = -1;
std::shared_ptr<mscclpp::Communicator> communicator;
mscclpp::Transport ibTransport;
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>> connections;
};
class CommunicatorTest : public CommunicatorTestBase {
protected:
void SetUp() override {
CommunicatorTestBase::SetUp();
ASSERT_EQ((deviceBufferSize / sizeof(int)) % gEnv->worldSize, 0);
connectMesh();
devicePtr.resize(numBuffers);
localMemory.resize(numBuffers);
remoteMemory.resize(numBuffers);
std::vector<int> remoteRanks;
for (int i = 0; i < gEnv->worldSize; i++) {
if (i != gEnv->rank) {
remoteRanks.push_back(i);
}
}
for (int n = 0; n < numBuffers; n++) {
devicePtr[n] = mscclpp::allocSharedCuda<int>(deviceBufferSize / sizeof(int));
registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0,
remoteRanks, localMemory[n], remoteMemory[n]);
}
}
void TearDown() override {
remoteMemory.clear();
localMemory.clear();
devicePtr.clear();
CommunicatorTestBase::TearDown();
}
void deviceBufferInit() {
size_t dataCount = deviceBufferSize / sizeof(int);
for (int n = 0; n < (int)devicePtr.size(); n++) {
std::vector<int> hostBuffer(dataCount, 0);
for (int i = 0; i < dataCount; i++) {
hostBuffer[i] = gEnv->rank + n * gEnv->worldSize;
}
mscclpp::memcpyCuda<int>(devicePtr[n].get(), hostBuffer.data(), dataCount, cudaMemcpyHostToDevice);
}
}
void writeToRemote(int dataCountPerRank) {
for (int n = 0; n < numBuffers; n++) {
for (int i = 0; i < gEnv->worldSize; i++) {
if (i != gEnv->rank) {
auto& conn = connections.at(i);
auto& peerMemory = remoteMemory[n].at(i);
conn->write(peerMemory, gEnv->rank * dataCountPerRank * sizeof(int), localMemory[n],
gEnv->rank * dataCountPerRank * sizeof(int), dataCountPerRank * sizeof(int));
conn->flush();
}
}
}
}
bool testWriteCorrectness(bool skipLocal = false) {
size_t dataCount = deviceBufferSize / sizeof(int);
for (int n = 0; n < (int)devicePtr.size(); n++) {
std::vector<int> hostBuffer(dataCount, 0);
mscclpp::memcpyCuda<int>(hostBuffer.data(), devicePtr[n].get(), dataCount, cudaMemcpyDeviceToHost);
for (int i = 0; i < gEnv->worldSize; i++) {
if (((i / gEnv->nRanksPerNode) == (gEnv->rank / gEnv->nRanksPerNode)) && skipLocal) {
continue;
}
for (int j = i * dataCount / gEnv->worldSize; j < (i + 1) * dataCount / gEnv->worldSize; j++) {
if (hostBuffer[j] != i + n * gEnv->worldSize) {
return false;
}
}
}
}
return true;
}
const size_t numBuffers = 10;
const int deviceBufferSize = 1024 * 1024;
std::vector<std::shared_ptr<int>> devicePtr;
std::vector<mscclpp::RegisteredMemory> localMemory;
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>> remoteMemory;
};
TEST_F(CommunicatorTest, BasicWrite) {
if (gEnv->rank >= numRanksToUse) return;
deviceBufferInit();
communicator->bootstrapper()->barrier();
writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize);
communicator->bootstrapper()->barrier();
// polling until it becomes ready
bool ready = false;
int niter = 0;
do {
ready = testWriteCorrectness();
niter++;
if (niter == 10000) {
FAIL() << "Polling is stuck.";
}
} while (!ready);
communicator->bootstrapper()->barrier();
}
__global__ void kernelIncEpochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) {
int tid = threadIdx.x;
if (tid != rank && tid < worldSize) {
deviceEpochs[tid].epochIncrement();
}
}
__global__ void kernelWaitEpochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) {
int tid = threadIdx.x;
if (tid != rank && tid < worldSize) {
deviceEpochs[tid].wait();
}
}
TEST_F(CommunicatorTest, WriteWithDeviceEpochs) {
if (gEnv->rank >= numRanksToUse) return;
std::unordered_map<int, std::shared_ptr<mscclpp::DeviceEpoch>> epochs;
for (auto entry : connections) {
auto& conn = entry.second;
epochs.insert({entry.first, std::make_shared<mscclpp::DeviceEpoch>(*communicator.get(), conn)});
}
communicator->setup();
communicator->bootstrapper()->barrier();
deviceBufferInit();
communicator->bootstrapper()->barrier();
auto deviceEpochHandles = mscclpp::allocSharedCuda<mscclpp::DeviceEpoch::DeviceHandle>(gEnv->worldSize);
for (int i = 0; i < gEnv->worldSize; i++) {
if (i != gEnv->rank) {
mscclpp::DeviceEpoch::DeviceHandle deviceHandle = epochs[i]->deviceHandle();
mscclpp::memcpyCuda<mscclpp::DeviceEpoch::DeviceHandle>(deviceEpochHandles.get() + i, &deviceHandle, 1,
cudaMemcpyHostToDevice);
}
}
communicator->bootstrapper()->barrier();
writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize);
kernelIncEpochs<<<1, gEnv->worldSize>>>(deviceEpochHandles.get(), gEnv->rank, gEnv->worldSize);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
for (int i = 0; i < gEnv->worldSize; i++) {
if (i != gEnv->rank) {
epochs[i]->signal();
}
}
kernelWaitEpochs<<<1, gEnv->worldSize>>>(deviceEpochHandles.get(), gEnv->rank, gEnv->worldSize);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
ASSERT_TRUE(testWriteCorrectness());
communicator->bootstrapper()->barrier();
}
TEST_F(CommunicatorTest, WriteWithHostEpochs) {
if (gEnv->rank >= numRanksToUse) return;
std::unordered_map<int, std::shared_ptr<mscclpp::HostEpoch>> epochs;
for (auto entry : connections) {
auto& conn = entry.second;
// HostEpoch cannot be used with CudaIpc transport
if (conn->transport() == mscclpp::Transport::CudaIpc) continue;
epochs.insert({entry.first, std::make_shared<mscclpp::HostEpoch>(*communicator.get(), conn)});
}
communicator->setup();
communicator->bootstrapper()->barrier();
deviceBufferInit();
communicator->bootstrapper()->barrier();
writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize);
for (int i = 0; i < gEnv->worldSize; i++) {
if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) {
epochs[i]->incrementAndSignal();
}
}
for (int i = 0; i < gEnv->worldSize; i++) {
if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) {
epochs[i]->wait();
}
}
ASSERT_TRUE(testWriteCorrectness());
communicator->bootstrapper()->barrier();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Channel tests
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class ChannelOneToOneTest : public CommunicatorTestBase {
protected:
void SetUp() override {
// Use only two ranks
setNumRanksToUse(2);
CommunicatorTestBase::SetUp();
channelService = std::make_shared<mscclpp::channel::DeviceChannelService>(*communicator.get());
}
void TearDown() override { CommunicatorTestBase::TearDown(); }
void setupMeshConnections(std::vector<mscclpp::channel::SimpleDeviceChannel>& devChannels, bool useIbOnly,
void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0) {
const int rank = communicator->bootstrapper()->getRank();
const int worldSize = communicator->bootstrapper()->getNranks();
const bool isInPlace = (recvBuff == nullptr);
mscclpp::TransportFlags transport = mscclpp::Transport::CudaIpc | ibTransport;
connectMesh(useIbOnly);
for (int r = 0; r < worldSize; r++) {
if (r == rank) {
continue;
}
mscclpp::RegisteredMemory sendMemory;
mscclpp::RegisteredMemory remoteMemory;
void* tmpBuff = nullptr;
if (isInPlace) {
registerMemoryPair(sendBuff, sendBuffBytes, transport, 0, r, sendMemory, remoteMemory);
} else {
sendMemory = communicator->registerMemory(recvBuff, recvBuffBytes, transport);
mscclpp::RegisteredMemory recvMemory;
registerMemoryPair(recvBuff, recvBuffBytes, transport, 0, r, recvMemory, remoteMemory);
tmpBuff = recvMemory.data();
}
mscclpp::channel::ChannelId cid = channelService->addChannel(connections[r]);
communicator->setup();
// TODO: enable this when we support out-of-place
// devChannels.emplace_back(channelService->deviceChannel(cid),
// channelService->addMemory(remoteMemory), channelService->addMemory(sendMemory),
// remoteMemory.data(), sendMemory.data(), tmpBuff);
devChannels.emplace_back(channelService->deviceChannel(cid), channelService->addMemory(remoteMemory),
channelService->addMemory(sendMemory), remoteMemory.data(), sendMemory.data());
}
}
std::shared_ptr<mscclpp::channel::DeviceChannelService> channelService;
};
__constant__ mscclpp::channel::SimpleDeviceChannel gChannelOneToOneTestConstDevChans;
__global__ void kernelPingPong(int rank, int nElem) {
mscclpp::channel::SimpleDeviceChannel& devChan = gChannelOneToOneTestConstDevChans;
volatile int* sendBuff = (volatile int*)devChan.srcPtr_;
int nTries = 1000;
int flusher = 0;
int rank1Offset = 10000000;
for (int i = 0; i < nTries; i++) {
if (rank == 0) {
if (i > 0) {
if (threadIdx.x == 0) devChan.wait();
__syncthreads();
for (int j = threadIdx.x; j < nElem; j += blockDim.x) {
if (sendBuff[j] != rank1Offset + i - 1 + j) {
printf("rank 0 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], 100000 + i - 1 + j);
}
}
}
for (int j = threadIdx.x; j < nElem; j += blockDim.x) {
sendBuff[j] = i + j;
}
__syncthreads();
// __threadfence_system(); // not necessary if we make sendBuff volatile
if (threadIdx.x == 0) devChan.putWithSignal(0, nElem * sizeof(int));
}
if (rank == 1) {
if (threadIdx.x == 0) devChan.wait();
__syncthreads();
for (int j = threadIdx.x; j < nElem; j += blockDim.x) {
if (sendBuff[j] != i + j) {
printf("rank 1 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], i + j);
}
}
if (i < nTries - 1) {
for (int j = threadIdx.x; j < nElem; j += blockDim.x) {
sendBuff[j] = rank1Offset + i + j;
}
__syncthreads();
// __threadfence_system(); // not necessary if we make sendBuff volatile
if (threadIdx.x == 0) devChan.putWithSignal(0, nElem * sizeof(int));
}
}
flusher++;
if (flusher == 100) {
devChan.flush();
flusher = 0;
}
}
}
TEST_F(ChannelOneToOneTest, PingPongIb) {
if (gEnv->rank >= numRanksToUse) return;
const int nElem = 4 * 1024 * 1024;
std::vector<mscclpp::channel::SimpleDeviceChannel> devChannels;
std::shared_ptr<int> buff = mscclpp::allocSharedCuda<int>(nElem);
setupMeshConnections(devChannels, true, buff.get(), nElem * sizeof(int));
ASSERT_EQ(devChannels.size(), 1);
MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstDevChans, devChannels.data(),
sizeof(mscclpp::channel::SimpleDeviceChannel)));
channelService->startProxy();
kernelPingPong<<<1, 1024>>>(gEnv->rank, 1);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
kernelPingPong<<<1, 1024>>>(gEnv->rank, 1024);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
kernelPingPong<<<1, 1024>>>(gEnv->rank, 1024 * 1024);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
kernelPingPong<<<1, 1024>>>(gEnv->rank, 4 * 1024 * 1024);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
channelService->stopProxy();
}

View File

@@ -47,7 +47,8 @@ __device__ void alltoall1(int rank, int nRanksPerNode, size_t nElements) {
__global__ void kernel(int rank, int worldSize, size_t nElements, int nRanksPerNode, int kernelNum) {
if (kernelNum == 0) {
alltoall0(rank, worldSize, nElements);
} if (kernelNum == 1) {
}
if (kernelNum == 1) {
alltoall1(rank, nRanksPerNode, nElements);
}
}
@@ -69,7 +70,7 @@ void AllToAllTestColl::runColl(const TestArgs& args, cudaStream_t stream) {
const int kernelNum = args.kernelNum;
const int nRanksPerNode = args.nRanksPerNode;
CUDATHROW(cudaMemcpyAsync((int*)localRecvBuff + paramCount_ * rank, (int*)localSendBuff + paramCount_ * rank,
paramCount_ * sizeof(int), cudaMemcpyDeviceToDevice, stream));
paramCount_ * sizeof(int), cudaMemcpyDeviceToDevice, stream));
kernel<<<worldSize - 1, 32, 0, stream>>>(rank, worldSize, paramCount_, nRanksPerNode, kernelNum);
}

View File

@@ -72,8 +72,11 @@ double allreduceTime(int worldSize, double value, int average) {
double accumulator = value;
if (average != 0) {
MPI_Op op =
average == 1 ? MPI_SUM : average == 2 ? MPI_MIN : average == 3 ? MPI_MAX : average == 4 ? MPI_SUM : MPI_Op();
MPI_Op op = average == 1 ? MPI_SUM
: average == 2 ? MPI_MIN
: average == 3 ? MPI_MAX
: average == 4 ? MPI_SUM
: MPI_Op();
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator, 1, MPI_DOUBLE, op, MPI_COMM_WORLD);
}

View File

@@ -1,4 +1,8 @@
target_sources(unit_tests PRIVATE
core_tests.cc
cuda_memory_tests.cc
cuda_utils_tests.cc
errors_tests.cc
fifo_tests.cu
numa_tests.cc
utils_tests.cc
)

View File

@@ -37,14 +37,14 @@ TEST_F(LocalCommunicatorTest, RegisterMemory) {
EXPECT_EQ(memory.transports(), mscclpp::NoTransports);
}
TEST_F(LocalCommunicatorTest, SendMemoryToSelf) {
int dummy[42];
auto memory = comm->registerMemory(&dummy, sizeof(dummy), mscclpp::NoTransports);
comm->sendMemoryOnSetup(memory, 0, 0);
auto memoryFuture = comm->recvMemoryOnSetup(0, 0);
comm->setup();
auto sameMemory = memoryFuture.get();
EXPECT_EQ(sameMemory.size(), memory.size());
EXPECT_EQ(sameMemory.rank(), memory.rank());
EXPECT_EQ(sameMemory.transports(), memory.transports());
}
// TEST_F(LocalCommunicatorTest, SendMemoryToSelf) {
// int dummy[42];
// auto memory = comm->registerMemory(&dummy, sizeof(dummy), mscclpp::NoTransports);
// comm->sendMemoryOnSetup(memory, 0, 0);
// auto memoryFuture = comm->recvMemoryOnSetup(0, 0);
// comm->setup();
// auto sameMemory = memoryFuture.get();
// EXPECT_EQ(sameMemory.size(), memory.size());
// EXPECT_EQ(sameMemory.rank(), memory.rank());
// EXPECT_EQ(sameMemory.transports(), memory.transports());
// }

View File

@@ -1,13 +0,0 @@
#include <gtest/gtest.h>
#include <mscclpp/cuda_utils.hpp>
TEST(CudaMemoryTest, Shared) {
auto p1 = mscclpp::allocSharedCuda<uint32_t>();
auto p2 = mscclpp::allocSharedCuda<int64_t>(5);
}
TEST(CudaMemoryTest, Unique) {
auto p1 = mscclpp::allocUniqueCuda<uint32_t>();
auto p2 = mscclpp::allocUniqueCuda<int64_t>(5);
}

View File

@@ -0,0 +1,39 @@
#include <gtest/gtest.h>
#include <mscclpp/cuda_utils.hpp>
TEST(CudaUtilsTest, AllocShared) {
auto p1 = mscclpp::allocSharedCuda<uint32_t>();
auto p2 = mscclpp::allocSharedCuda<int64_t>(5);
}
TEST(CudaUtilsTest, AllocUnique) {
auto p1 = mscclpp::allocUniqueCuda<uint32_t>();
auto p2 = mscclpp::allocUniqueCuda<int64_t>(5);
}
TEST(CudaUtilsTest, MakeSharedHost) {
auto p1 = mscclpp::makeSharedCudaHost<uint32_t>();
auto p2 = mscclpp::makeSharedCudaHost<int64_t>(5);
}
TEST(CudaUtilsTest, MakeUniqueHost) {
auto p1 = mscclpp::makeUniqueCudaHost<uint32_t>();
auto p2 = mscclpp::makeUniqueCudaHost<int64_t>(5);
}
TEST(CudaUtilsTest, Memcpy) {
const int nElem = 1024;
std::vector<int> hostBuff(nElem);
for (int i = 0; i < nElem; ++i) {
hostBuff[i] = i + 1;
}
std::vector<int> hostBuffTmp(nElem, 0);
auto devBuff = mscclpp::allocSharedCuda<int>(nElem);
mscclpp::memcpyCuda<int>(devBuff.get(), hostBuff.data(), nElem, cudaMemcpyHostToDevice);
mscclpp::memcpyCuda<int>(hostBuffTmp.data(), devBuff.get(), nElem, cudaMemcpyDeviceToHost);
for (int i = 0; i < nElem; ++i) {
EXPECT_EQ(hostBuff[i], hostBuffTmp[i]);
}
}

33
test/unit/errors_tests.cc Normal file
View File

@@ -0,0 +1,33 @@
#include <gtest/gtest.h>
#include <mscclpp/errors.hpp>
TEST(ErrorsTest, SystemError) {
mscclpp::Error error("test", mscclpp::ErrorCode::SystemError);
EXPECT_EQ(error.getErrorCode(), static_cast<int>(mscclpp::ErrorCode::SystemError));
EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: SystemError)"));
}
TEST(ErrorsTest, InternalError) {
mscclpp::Error error("test", mscclpp::ErrorCode::InternalError);
EXPECT_EQ(error.getErrorCode(), static_cast<int>(mscclpp::ErrorCode::InternalError));
EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: InternalError)"));
}
TEST(ErrorsTest, InvalidUsage) {
mscclpp::Error error("test", mscclpp::ErrorCode::InvalidUsage);
EXPECT_EQ(error.getErrorCode(), static_cast<int>(mscclpp::ErrorCode::InvalidUsage));
EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: InvalidUsage)"));
}
TEST(ErrorsTest, Timeout) {
mscclpp::Error error("test", mscclpp::ErrorCode::Timeout);
EXPECT_EQ(error.getErrorCode(), static_cast<int>(mscclpp::ErrorCode::Timeout));
EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: Timeout)"));
}
TEST(ErrorsTest, UnknownError) {
mscclpp::Error error("test", static_cast<mscclpp::ErrorCode>(-1));
EXPECT_EQ(error.getErrorCode(), -1);
EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: UnknownError)"));
}

67
test/unit/fifo_tests.cu Normal file
View File

@@ -0,0 +1,67 @@
#include <gtest/gtest.h>
#include <mscclpp/cuda_utils.hpp>
#include <mscclpp/fifo.hpp>
#include <mscclpp/utils.hpp>
#define FLUSH_PERIOD (MSCCLPP_PROXY_FIFO_SIZE) // should not exceed MSCCLPP_PROXY_FIFO_SIZE
#define ITER 10000 // should be larger than MSCCLPP_PROXY_FIFO_SIZE for proper testing
__constant__ mscclpp::DeviceProxyFifo gFifoTestDeviceProxyFifo;
__global__ void kernelFifoTest() {
if (threadIdx.x + blockIdx.x * blockDim.x != 0) return;
mscclpp::DeviceProxyFifo& fifo = gFifoTestDeviceProxyFifo;
mscclpp::ProxyTrigger trigger;
for (uint64_t i = 1; i < ITER + 1; ++i) {
trigger.fst = i;
trigger.snd = i;
uint64_t curFifoHead = fifo.push(trigger);
if (i % FLUSH_PERIOD == 0) {
fifo.sync(curFifoHead);
}
}
}
TEST(FifoTest, HostProxyFifo) {
ASSERT_LE(FLUSH_PERIOD, MSCCLPP_PROXY_FIFO_SIZE);
mscclpp::HostProxyFifo hostFifo;
mscclpp::DeviceProxyFifo devFifo = hostFifo.deviceFifo();
MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gFifoTestDeviceProxyFifo, &devFifo, sizeof(devFifo)));
kernelFifoTest<<<1, 1>>>();
MSCCLPP_CUDATHROW(cudaGetLastError());
mscclpp::ProxyTrigger trigger;
trigger.fst = 0;
trigger.snd = 0;
uint64_t spin = 0;
uint64_t flushCnt = 0;
mscclpp::Timer timer(3);
for (uint64_t i = 0; i < ITER; ++i) {
while (trigger.fst == 0) {
hostFifo.poll(&trigger);
if (spin++ > 1000000) {
FAIL() << "Polling is stuck.";
}
}
ASSERT_TRUE(trigger.fst == (i + 1));
ASSERT_TRUE(trigger.snd == (i + 1));
hostFifo.pop();
if ((++flushCnt % FLUSH_PERIOD) == 0) {
hostFifo.flushTail();
}
trigger.fst = 0;
spin = 0;
}
hostFifo.flushTail(true);
std::stringstream ss;
ss << "FifoTest.HostProxyFifo: " << (float)timer.elapsed() / ITER << " us/iter\n";
std::cout << ss.str();
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
}

18
test/unit/numa_tests.cc Normal file
View File

@@ -0,0 +1,18 @@
#include <gtest/gtest.h>
#include <mscclpp/cuda_utils.hpp>
#include "numa.hpp"
TEST(NumaTest, Basic) {
int num;
MSCCLPP_CUDATHROW(cudaGetDeviceCount(&num));
if (num == 0) {
return;
}
for (int i = 0; i < num; i++) {
int numaNode = mscclpp::getDeviceNumaNode(i);
EXPECT_GE(numaNode, 0);
mscclpp::numaBind(numaNode);
}
}

49
test/unit/utils_tests.cc Normal file
View File

@@ -0,0 +1,49 @@
#include <gtest/gtest.h>
#include <mscclpp/errors.hpp>
#include <mscclpp/utils.hpp>
TEST(UtilsTest, Timer) {
mscclpp::Timer timer;
sleep(1);
int64_t elapsed = timer.elapsed();
EXPECT_GE(elapsed, 1000000);
timer.reset();
sleep(1);
elapsed = timer.elapsed();
EXPECT_GE(elapsed, 1000000);
EXPECT_LT(elapsed, 1100000);
}
TEST(UtilsTest, TimerTimeout) {
mscclpp::Timer timer(1);
ASSERT_THROW(sleep(2), mscclpp::Error);
}
TEST(UtilsTest, TimerTimeoutReset) {
mscclpp::Timer timer(3);
sleep(2);
// Resetting the timer should prevent the timeout.
timer.reset();
ASSERT_NO_THROW(sleep(2));
// Elapsed time should be slightly larger than 2 seconds.
EXPECT_GT(timer.elapsed(), 2000000);
EXPECT_LT(timer.elapsed(), 2100000);
}
TEST(UtilsTest, ScopedTimer) {
mscclpp::ScopedTimer timerA("UtilsTest.ScopedTimer.A");
mscclpp::ScopedTimer timerB("UtilsTest.ScopedTimer.B");
sleep(1);
int64_t elapsedA = timerA.elapsed();
int64_t elapsedB = timerB.elapsed();
EXPECT_GE(elapsedA, 1000000);
EXPECT_GE(elapsedB, 1000000);
}
TEST(UtilsTest, getHostName) {
std::string hostname = mscclpp::getHostName(1024, '.');
EXPECT_FALSE(hostname.empty());
}