From aaa3f0e94521c5c3ec24a67b6a39e6aa31c71917 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Thu, 27 Apr 2023 19:17:19 +0000 Subject: [PATCH] host hashes in communicator --- src/communicator.cc | 20 +++++++++++++++++++- src/include/communicator.hpp | 1 + tests/communicator_test_cpp.cc | 3 ++- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/communicator.cc b/src/communicator.cc index 7e1348e8..6f458fe5 100644 --- a/src/communicator.cc +++ b/src/communicator.cc @@ -1,3 +1,5 @@ +#include + #include "mscclpp.hpp" #include "communicator.hpp" #include "host_connection.hpp" @@ -12,7 +14,13 @@ namespace mscclpp { -Communicator::Impl::Impl(std::shared_ptr bootstrap) : bootstrap_(bootstrap) {} +Communicator::Impl::Impl(std::shared_ptr bootstrap) : bootstrap_(bootstrap) { + rankToHash_.resize(bootstrap->getNranks()); + auto hostHash = getHostHash(); + INFO(MSCCLPP_INIT, "Host hash: %lx", hostHash); + rankToHash_[bootstrap->getRank()] = hostHash; + bootstrap->allGather(rankToHash_.data(), sizeof(uint64_t)); +} Communicator::Impl::~Impl() { ibContexts.clear(); @@ -67,11 +75,21 @@ RegisteredMemory Communicator::registerMemory(void* ptr, size_t size, TransportF MSCCLPP_API_CPP std::shared_ptr Communicator::connect(int remoteRank, int tag, TransportFlags transport) { std::shared_ptr conn; if (transport | TransportCudaIpc) { + // sanity check: make sure the IPC connection is being made within a node + if (pimpl->rankToHash_[remoteRank] != pimpl->rankToHash_[pimpl->bootstrap_->getRank()]) { + std::stringstream ss; + ss << "Cuda IPC connection can only be made within a node: " << remoteRank << " != " << pimpl->bootstrap_->getRank(); + throw std::runtime_error(ss.str()); + } auto cudaIpcConn = std::make_shared(); conn = cudaIpcConn; + INFO(MSCCLPP_INIT, "Cuda IPC connection between %d(%lx) and %d(%lx) created", pimpl->bootstrap_->getRank(), pimpl->rankToHash_[pimpl->bootstrap_->getRank()], + remoteRank, pimpl->rankToHash_[remoteRank]); } else if (transport | TransportAllIB) { auto ibConn = std::make_shared(remoteRank, tag, transport, *pimpl); conn = ibConn; + INFO(MSCCLPP_INIT, "IB connection between %d(%lx) via %s and %d(%lx) created", pimpl->bootstrap_->getRank(), pimpl->rankToHash_[pimpl->bootstrap_->getRank()], + getIBDeviceName(transport).c_str(), remoteRank, pimpl->rankToHash_[remoteRank]); } else { throw std::runtime_error("Unsupported transport"); } diff --git a/src/include/communicator.hpp b/src/include/communicator.hpp index 53d0fd73..5be00a67 100644 --- a/src/include/communicator.hpp +++ b/src/include/communicator.hpp @@ -18,6 +18,7 @@ struct Communicator::Impl { std::vector> connections; std::unordered_map> ibContexts; std::shared_ptr bootstrap_; + std::vector rankToHash_; Impl(std::shared_ptr bootstrap); diff --git a/tests/communicator_test_cpp.cc b/tests/communicator_test_cpp.cc index d3fe15b0..05595313 100644 --- a/tests/communicator_test_cpp.cc +++ b/tests/communicator_test_cpp.cc @@ -30,7 +30,8 @@ void test_communicator(int rank, int worldSize, int nranksPerNode){ auto communicator = std::make_shared(bootstrap); for (int i = 0; i < worldSize; i++){ if (i != rank){ - if (i % nranksPerNode == rank % nranksPerNode){ + if (i / nranksPerNode == rank / nranksPerNode){ + printf("i %d rank %d nranksPerNode %d\n", i, rank, nranksPerNode); auto connect = communicator->connect(i, 0, mscclpp::TransportCudaIpc); } else { auto connect = communicator->connect(i, 0, findIb(rank % nranksPerNode));