mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-22 05:48:29 +00:00
more debbuging info + testing 1000 memory registerations
This commit is contained in:
@@ -61,6 +61,8 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register
|
||||
char* srcPtr = (char*)src.data();
|
||||
|
||||
CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, srcPtr + srcOffset, size, cudaMemcpyDeviceToDevice, stream));
|
||||
INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, size %lu", srcPtr + srcOffset, dstPtr + dstOffset, size);
|
||||
|
||||
// npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size);
|
||||
}
|
||||
|
||||
@@ -114,6 +116,7 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem
|
||||
qp->stageSend(srcMr, dstMrInfo, (uint32_t)size, /*wrId=*/0, /*srcOffset=*/srcOffset, /*dstOffset=*/dstOffset,
|
||||
/*signaled=*/true);
|
||||
qp->postSend();
|
||||
INFO(MSCCLPP_NET, "IBConnection write: from %p to %p, size %lu", (uint8_t*)srcMr->getBuff() + srcOffset, (uint8_t*)dstMrInfo.addr + dstOffset, size);
|
||||
// npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size);
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,55 @@ mscclpp::Transport findIb(int localRank)
|
||||
return IBs[localRank];
|
||||
}
|
||||
|
||||
void register_all_memories(std::unique_ptr<mscclpp::Communicator>& communicator, int rank, int worldSize, void* devicePtr, size_t deviceBufferSize, mscclpp::Transport myIbDevice, mscclpp::RegisteredMemory& localMemory, std::unordered_map<int, mscclpp::RegisteredMemory>& remoteMemory){
|
||||
localMemory = communicator->registerMemory(devicePtr, deviceBufferSize, mscclpp::Transport::CudaIpc | myIbDevice);
|
||||
int serializedSize = 0;
|
||||
for (int i = 0; i < worldSize; i++) {
|
||||
if (i != rank){
|
||||
auto serialized = localMemory.serialize();
|
||||
serializedSize = serialized.size();
|
||||
communicator->bootstrapper()->send(serialized.data(), serializedSize, i, 0);
|
||||
}
|
||||
}
|
||||
if (serializedSize == 0) {
|
||||
throw std::runtime_error("Serialized size should have been set to a non-zero value.");
|
||||
}
|
||||
for (int i = 0; i < worldSize; i++) {
|
||||
if (i != rank){
|
||||
std::vector<char> deserialized(serializedSize);
|
||||
communicator->bootstrapper()->recv(deserialized.data(), serializedSize, i, 0);
|
||||
auto remote = mscclpp::RegisteredMemory::deserialize(deserialized);
|
||||
remoteMemory[i] = remote;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void make_connections(std::unique_ptr<mscclpp::Communicator>& communicator, int rank, int worldSize, int nRanksPerNode, mscclpp::Transport myIbDevice, std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections){
|
||||
for (int i = 0; i < worldSize; i++) {
|
||||
if (i != rank){
|
||||
if (i / nRanksPerNode == rank / nRanksPerNode) {
|
||||
connections[i] = communicator->connect(i, 0, mscclpp::Transport::CudaIpc);
|
||||
} else {
|
||||
connections[i] = communicator->connect(i, 0, myIbDevice);
|
||||
}
|
||||
}
|
||||
}
|
||||
communicator->connectionSetup();
|
||||
}
|
||||
|
||||
void write_remote(int rank, int worldSize, std::unordered_map<int, std::shared_ptr<mscclpp::Connection>>& connections, std::unordered_map<int, mscclpp::RegisteredMemory>& remoteRegisteredMemories, mscclpp::RegisteredMemory& registeredMemory, int writeSize){
|
||||
for (int i = 0; i < worldSize; i++) {
|
||||
if (i != rank) {
|
||||
auto& conn = connections.at(i);
|
||||
auto& peerMemory = remoteRegisteredMemories.at(i);
|
||||
// printf("write to rank: %d, rank is %d\n", peerMemory.rank(), rank);
|
||||
conn->write(peerMemory, rank * writeSize, registeredMemory, rank * writeSize, writeSize);
|
||||
conn->flush();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void test_communicator(int rank, int worldSize, int nranksPerNode)
|
||||
{
|
||||
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(rank, worldSize);
|
||||
@@ -32,104 +81,90 @@ void test_communicator(int rank, int worldSize, int nranksPerNode)
|
||||
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
|
||||
bootstrap->initialize(id);
|
||||
|
||||
auto communicator = std::make_shared<mscclpp::Communicator>(bootstrap);
|
||||
auto communicator = std::make_unique<mscclpp::Communicator>(bootstrap);
|
||||
if (bootstrap->getRank() == 0)
|
||||
std::cout << "Communicator initialization passed" << std::endl;
|
||||
|
||||
std::unordered_map<int, std::shared_ptr<mscclpp::Connection>> connections;
|
||||
auto myIbDevice = findIb(rank % nranksPerNode);
|
||||
for (int i = 0; i < worldSize; i++) {
|
||||
if (i != rank) {
|
||||
std::shared_ptr<mscclpp::Connection> conn;
|
||||
if (i / nranksPerNode == rank / nranksPerNode) {
|
||||
conn = communicator->connect(i, 0, mscclpp::Transport::CudaIpc);
|
||||
} else {
|
||||
conn = communicator->connect(i, 0, myIbDevice);
|
||||
}
|
||||
connections[i] = conn;
|
||||
}
|
||||
}
|
||||
communicator->connectionSetup();
|
||||
|
||||
make_connections(communicator, rank, worldSize, nranksPerNode, myIbDevice, connections);
|
||||
if (bootstrap->getRank() == 0)
|
||||
std::cout << "Connection setup passed" << std::endl;
|
||||
|
||||
int* devicePtr;
|
||||
int size = 1024;
|
||||
CUDATHROW(cudaMalloc(&devicePtr, size));
|
||||
auto registeredMemory = communicator->registerMemory(devicePtr, size, mscclpp::Transport::CudaIpc | myIbDevice);
|
||||
int numBuffers = 1000;
|
||||
std::vector<int*> devicePtr(numBuffers);
|
||||
int deviceBufferSize = 1024*1024;
|
||||
|
||||
std::vector<mscclpp::RegisteredMemory> localMemory(numBuffers);
|
||||
std::vector<std::unordered_map<int, mscclpp::RegisteredMemory>> remoteMemory(numBuffers);
|
||||
|
||||
for (int i = 0; i < worldSize; i++) {
|
||||
if (i != rank){
|
||||
auto serialized = registeredMemory.serialize();
|
||||
int serializedSize = serialized.size();
|
||||
bootstrap->send(&serializedSize, sizeof(int), i, 0);
|
||||
bootstrap->send(serialized.data(), serializedSize, i, 1);
|
||||
}
|
||||
for (int n = 0; n < numBuffers; n++) {
|
||||
if (n % 100 == 0)
|
||||
std::cout << "Registering memory for " << std::to_string(n) << " buffers" << std::endl;
|
||||
CUDATHROW(cudaMalloc(&devicePtr[n], deviceBufferSize));
|
||||
register_all_memories(communicator, rank, worldSize, devicePtr[n], deviceBufferSize, myIbDevice, localMemory[n], remoteMemory[n]);
|
||||
}
|
||||
std::unordered_map<int, mscclpp::RegisteredMemory> registeredMemories;
|
||||
for (int i = 0; i < worldSize; i++) {
|
||||
if (i != rank){
|
||||
int deserializedSize;
|
||||
bootstrap->recv(&deserializedSize, sizeof(int), i, 0);
|
||||
std::vector<char> deserialized(deserializedSize);
|
||||
bootstrap->recv(deserialized.data(), deserializedSize, i, 1);
|
||||
auto deserializedRegisteredMemory = mscclpp::RegisteredMemory::deserialize(deserialized);
|
||||
registeredMemories.insert({deserializedRegisteredMemory.rank(), deserializedRegisteredMemory});
|
||||
}
|
||||
}
|
||||
|
||||
bootstrap->barrier();
|
||||
if (bootstrap->getRank() == 0)
|
||||
std::cout << "Memory registration passed" << std::endl;
|
||||
std::cout << "Memory registration for " << std::to_string(numBuffers) << " buffers passed" << std::endl;
|
||||
|
||||
assert((size / sizeof(int)) % worldSize == 0);
|
||||
size_t writeSize = size / worldSize;
|
||||
size_t dataCount = size / sizeof(int);
|
||||
// std::vector<int> hostBuffer(dataCount, 0);
|
||||
std::shared_ptr<int[]> hostBuffer(new int[dataCount]);
|
||||
for (int i = 0; i < dataCount; i++) {
|
||||
hostBuffer[i] = rank;
|
||||
|
||||
assert((deviceBufferSize / sizeof(int)) % worldSize == 0);
|
||||
size_t writeSize = deviceBufferSize / worldSize;
|
||||
size_t dataCount = deviceBufferSize / sizeof(int);
|
||||
for (int n = 0; n < numBuffers; n++){
|
||||
std::vector<int> hostBuffer(dataCount, 0);
|
||||
for (int i = 0; i < dataCount; i++) {
|
||||
hostBuffer[i] = rank + n * worldSize;
|
||||
}
|
||||
CUDATHROW(cudaMemcpy(devicePtr[n], hostBuffer.data(), deviceBufferSize, cudaMemcpyHostToDevice));
|
||||
}
|
||||
CUDATHROW(cudaMemcpy(devicePtr, hostBuffer.get(), size, cudaMemcpyHostToDevice));
|
||||
CUDATHROW(cudaDeviceSynchronize());
|
||||
|
||||
bootstrap->barrier();
|
||||
for (int i = 0; i < worldSize; i++) {
|
||||
if (i != rank) {
|
||||
auto& conn = connections.at(i);
|
||||
auto& peerMemory = registeredMemories.at(i);
|
||||
// printf("write to rank: %d, rank is %d\n", peerMemory.rank(), rank);
|
||||
conn->write(peerMemory, rank * writeSize, registeredMemory, rank * writeSize, writeSize);
|
||||
conn->flush();
|
||||
}
|
||||
if (bootstrap->getRank() == 0)
|
||||
std::cout << "CUDA memory initialization passed" << std::endl;
|
||||
|
||||
for (int n = 0; n < numBuffers; n++){
|
||||
write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], writeSize);
|
||||
}
|
||||
bootstrap->barrier();
|
||||
// polling until it becomes ready
|
||||
bool ready = false;
|
||||
int niter = 0;
|
||||
do {
|
||||
ready = true;
|
||||
CUDATHROW(cudaMemcpy(hostBuffer.get(), devicePtr, size, cudaMemcpyDeviceToHost));
|
||||
size_t dataPerRank = writeSize / sizeof(int);
|
||||
for (int i = 0; i < dataCount; i++) {
|
||||
if (hostBuffer[i] != i / dataPerRank) {
|
||||
ready = false;
|
||||
if (bootstrap->getRank() == 0)
|
||||
std::cout << "RDMA write for " << std::to_string(numBuffers) << " buffers passed" << std::endl;
|
||||
|
||||
for (int n = 0; n < numBuffers; n++){
|
||||
// polling until it becomes ready
|
||||
bool ready = false;
|
||||
int niter = 0;
|
||||
std::vector<int> hostBuffer(dataCount, 0);
|
||||
do {
|
||||
ready = true;
|
||||
CUDATHROW(cudaMemcpy(hostBuffer.data(), devicePtr[n], deviceBufferSize, cudaMemcpyDeviceToHost));
|
||||
for (int i = 0; i < worldSize; i++) {
|
||||
for (int j = i*writeSize/sizeof(int); j < (i+1)*writeSize/sizeof(int); j++) {
|
||||
if (hostBuffer[j] != i + n * worldSize) {
|
||||
ready = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (niter == 10000){
|
||||
throw std::runtime_error("Polling is stuck.");
|
||||
}
|
||||
niter++;
|
||||
} while (!ready);
|
||||
if (niter == 10000){
|
||||
throw std::runtime_error("Polling is stuck.");
|
||||
}
|
||||
niter++;
|
||||
} while (!ready);
|
||||
}
|
||||
|
||||
bootstrap->barrier();
|
||||
if (bootstrap->getRank() == 0)
|
||||
std::cout << "Connection write passed" << std::endl;
|
||||
std::cout << "Polling for " << std::to_string(numBuffers) << " buffers passed" << std::endl;
|
||||
|
||||
CUDATHROW(cudaFree(devicePtr));
|
||||
if (bootstrap->getRank() == 0)
|
||||
std::cout << "--- MSCCLPP::Communicator tests passed! ---" << std::endl;
|
||||
|
||||
for (int n = 0; n < numBuffers; n++){
|
||||
CUDATHROW(cudaFree(devicePtr[n]));
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char** argv)
|
||||
|
||||
Reference in New Issue
Block a user