From 82c27625e604c7ccd3d138adefddf1778b0e0e09 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Thu, 27 Apr 2023 21:33:15 +0000 Subject: [PATCH] ipc uses a base ptr now --- Makefile | 2 +- src/basic_proxy_handler.cc | 8 +- src/bootstrap/bootstrap.cc | 10 +- src/communicator.cc | 61 ++++++---- src/connection.cc | 60 ++++++---- src/epoch.cc | 13 +- src/fifo.cc | 29 +++-- src/host_connection.cc | 55 ++++++--- src/ib.cc | 131 +++++++++++--------- src/include/basic_proxy_handler.hpp | 4 +- src/include/channel.hpp | 70 +++++++---- src/include/checks.hpp | 11 ++ src/include/comm.h | 4 +- src/include/communicator.hpp | 11 +- src/include/connection.hpp | 27 +++-- src/include/epoch.hpp | 11 +- src/include/host_connection.hpp | 7 +- src/include/ib.hpp | 8 +- src/include/mscclpp.h | 10 +- src/include/mscclpp.hpp | 180 +++++++++++++++++----------- src/include/mscclppfifo.hpp | 25 ++-- src/include/proxy.h | 2 +- src/include/proxy.hpp | 10 +- src/include/registered_memory.hpp | 15 ++- src/include/registered_ptr.hpp | 34 ++++-- src/init.cc | 47 +++++--- src/proxy_cpp.cc | 28 +++-- src/registered_memory.cc | 60 +++++++--- tests/allgather_test_cpp.cu | 43 +++---- tests/bootstrap_test_cpp.cc | 52 +++++--- tests/communicator_test_cpp.cc | 32 ++--- tests/unittests/ib_test.cc | 2 +- 32 files changed, 650 insertions(+), 412 deletions(-) diff --git a/Makefile b/Makefile index 950751d7..41896041 100644 --- a/Makefile +++ b/Makefile @@ -61,7 +61,7 @@ endif NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) -std=c++11 --expt-extended-lambda -Xfatbin -compress-all # Use addprefix so that we can specify more than one path -NVLDFLAGS := -L$(CUDA_LIB) -lcudart -lrt +NVLDFLAGS := -L$(CUDA_LIB) -lcudart -lrt -lcuda ifeq ($(DEBUG), 0) NVCUFLAGS += -O3 diff --git a/src/basic_proxy_handler.cc b/src/basic_proxy_handler.cc index 482aa842..42470131 100644 --- a/src/basic_proxy_handler.cc +++ b/src/basic_proxy_handler.cc @@ -2,15 +2,17 @@ namespace mscclpp { -ProxyHandler makeBasicProxyHandler(Communicator::Impl &comm) { +ProxyHandler makeBasicProxyHandler(Communicator::Impl& comm) +{ return [&comm](ProxyTrigger triggerRaw) { - ChannelTrigger *trigger = reinterpret_cast(&triggerRaw); + ChannelTrigger* trigger = reinterpret_cast(&triggerRaw); HostConnection& conn = *comm.connections.at(trigger->fields.connId); auto result = ProxyHandlerResult::Continue; if (trigger->fields.type & mscclppData) { - conn.put(trigger->fields.dstBufferHandle, trigger->fields.dstOffset, trigger->fields.srcBufferHandle, trigger->fields.srcOffset, trigger->fields.size); + conn.put(trigger->fields.dstBufferHandle, trigger->fields.dstOffset, trigger->fields.srcBufferHandle, + trigger->fields.srcOffset, trigger->fields.size); } if (trigger->fields.type & mscclppFlag) { diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index dfce50b4..75225799 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -180,9 +180,8 @@ Bootstrap::Impl::~Impl() } } -void Bootstrap::Impl::getRemoteAddresses(mscclppSocket* listenSock, - std::vector& rankAddresses, - std::vector& rankAddressesRoot, int& rank) +void Bootstrap::Impl::getRemoteAddresses(mscclppSocket* listenSock, std::vector& rankAddresses, + std::vector& rankAddressesRoot, int& rank) { mscclppSocket sock; ExtInfo info; @@ -211,7 +210,7 @@ void Bootstrap::Impl::getRemoteAddresses(mscclppSocket* listenSock, } void Bootstrap::Impl::sendHandleToPeer(int peer, const std::vector& rankAddresses, - const std::vector& rankAddressesRoot) + const std::vector& rankAddressesRoot) { mscclppSocket sock; int next = (peer + 1) % this->nRanks_; @@ -226,7 +225,8 @@ void Bootstrap::Impl::bootstrapCreateRoot() mscclppSocket listenSock; // mscclppSocket* listenSock = new mscclppSocket(); // TODO(saemal) make this a shared ptr - MSCCLPPTHROW(mscclppSocketInit(&listenSock, &uniqueId_.addr, uniqueId_.magic, mscclppSocketTypeBootstrap, nullptr, 0)); + MSCCLPPTHROW( + mscclppSocketInit(&listenSock, &uniqueId_.addr, uniqueId_.magic, mscclppSocketTypeBootstrap, nullptr, 0)); MSCCLPPTHROW(mscclppSocketListen(&listenSock)); MSCCLPPTHROW(mscclppSocketGetAddr(&listenSock, &uniqueId_.addr)); auto lambda = [this, listenSock]() { this->bootstrapRoot(listenSock); }; diff --git a/src/communicator.cc b/src/communicator.cc index bdccf8eb..78df252d 100644 --- a/src/communicator.cc +++ b/src/communicator.cc @@ -1,20 +1,21 @@ #include -#include "mscclpp.hpp" -#include "communicator.hpp" -#include "host_connection.hpp" -#include "comm.h" -#include "basic_proxy_handler.hpp" #include "api.h" -#include "utils.h" +#include "basic_proxy_handler.hpp" #include "checks.hpp" -#include "debug.h" +#include "comm.h" +#include "communicator.hpp" #include "connection.hpp" +#include "debug.h" +#include "host_connection.hpp" +#include "mscclpp.hpp" #include "registered_memory.hpp" +#include "utils.h" namespace mscclpp { -Communicator::Impl::Impl(std::shared_ptr bootstrap) : bootstrap_(bootstrap) { +Communicator::Impl::Impl(std::shared_ptr bootstrap) : bootstrap_(bootstrap) +{ rankToHash_.resize(bootstrap->getNranks()); auto hostHash = getHostHash(); INFO(MSCCLPP_INIT, "Host hash: %lx", hostHash); @@ -22,11 +23,13 @@ Communicator::Impl::Impl(std::shared_ptr bootstrap) : bootstrap_( bootstrap->allGather(rankToHash_.data(), sizeof(uint64_t)); } -Communicator::Impl::~Impl() { +Communicator::Impl::~Impl() +{ ibContexts.clear(); } -IbCtx* Communicator::Impl::getIbContext(Transport ibTransport) { +IbCtx* Communicator::Impl::getIbContext(Transport ibTransport) +{ // Find IB context or create it auto it = ibContexts.find(ibTransport); if (it == ibContexts.end()) { @@ -40,39 +43,50 @@ IbCtx* Communicator::Impl::getIbContext(Transport ibTransport) { MSCCLPP_API_CPP Communicator::~Communicator() = default; -MSCCLPP_API_CPP Communicator::Communicator(std::shared_ptr bootstrap) : pimpl(std::make_unique(bootstrap)) {} +MSCCLPP_API_CPP Communicator::Communicator(std::shared_ptr bootstrap) + : pimpl(std::make_unique(bootstrap)) +{ +} -MSCCLPP_API_CPP void Communicator::bootstrapAllGather(void* data, int size) { +MSCCLPP_API_CPP void Communicator::bootstrapAllGather(void* data, int size) +{ mscclppBootstrapAllGather(pimpl->comm, data, size); } -MSCCLPP_API_CPP void Communicator::bootstrapBarrier() { +MSCCLPP_API_CPP void Communicator::bootstrapBarrier() +{ mscclppBootstrapBarrier(pimpl->comm); } -RegisteredMemory Communicator::registerMemory(void* ptr, size_t size, TransportFlags transports) { +RegisteredMemory Communicator::registerMemory(void* ptr, size_t size, TransportFlags transports) +{ return RegisteredMemory(std::make_shared(ptr, size, pimpl->comm->rank, transports, *pimpl)); } -MSCCLPP_API_CPP std::shared_ptr Communicator::connect(int remoteRank, int tag, Transport transport) { +MSCCLPP_API_CPP std::shared_ptr Communicator::connect(int remoteRank, int tag, Transport transport) +{ std::shared_ptr conn; if (transport == Transport::CudaIpc) { // sanity check: make sure the IPC connection is being made within a node if (pimpl->rankToHash_[remoteRank] != pimpl->rankToHash_[pimpl->bootstrap_->getRank()]) { std::stringstream ss; - ss << "Cuda IPC connection can only be made within a node: " << remoteRank << "(" << std::hex << pimpl->rankToHash_[pimpl->bootstrap_->getRank()] << ")" << " != " - << pimpl->bootstrap_->getRank() << "(" << std::hex << pimpl->rankToHash_[pimpl->bootstrap_->getRank()] << ")"; + ss << "Cuda IPC connection can only be made within a node: " << remoteRank << "(" << std::hex + << pimpl->rankToHash_[pimpl->bootstrap_->getRank()] << ")" + << " != " << pimpl->bootstrap_->getRank() << "(" << std::hex + << pimpl->rankToHash_[pimpl->bootstrap_->getRank()] << ")"; throw std::runtime_error(ss.str()); - } + } auto cudaIpcConn = std::make_shared(); conn = cudaIpcConn; - INFO(MSCCLPP_P2P, "Cuda IPC connection between rank %d(%lx) and remoteRank %d(%lx) created", pimpl->bootstrap_->getRank(), pimpl->rankToHash_[pimpl->bootstrap_->getRank()], - remoteRank, pimpl->rankToHash_[remoteRank]); + INFO(MSCCLPP_P2P, "Cuda IPC connection between rank %d(%lx) and remoteRank %d(%lx) created", + pimpl->bootstrap_->getRank(), pimpl->rankToHash_[pimpl->bootstrap_->getRank()], remoteRank, + pimpl->rankToHash_[remoteRank]); } else if (AllIBTransports.has(transport)) { auto ibConn = std::make_shared(remoteRank, tag, transport, *pimpl); conn = ibConn; - INFO(MSCCLPP_NET, "IB connection between rank %d(%lx) via %s and remoteRank %d(%lx) created", pimpl->bootstrap_->getRank(), pimpl->rankToHash_[pimpl->bootstrap_->getRank()], - getIBDeviceName(transport).c_str(), remoteRank, pimpl->rankToHash_[remoteRank]); + INFO(MSCCLPP_NET, "IB connection between rank %d(%lx) via %s and remoteRank %d(%lx) created", + pimpl->bootstrap_->getRank(), pimpl->rankToHash_[pimpl->bootstrap_->getRank()], + getIBDeviceName(transport).c_str(), remoteRank, pimpl->rankToHash_[remoteRank]); } else { throw std::runtime_error("Unsupported transport"); } @@ -80,7 +94,8 @@ MSCCLPP_API_CPP std::shared_ptr Communicator::connect(int remoteRank return conn; } -MSCCLPP_API_CPP void Communicator::connectionSetup() { +MSCCLPP_API_CPP void Communicator::connectionSetup() +{ for (auto& conn : pimpl->connections) { conn->startSetup(pimpl->bootstrap_); } diff --git a/src/connection.cc b/src/connection.cc index 031f63ec..75a6ba79 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -1,12 +1,13 @@ #include "connection.hpp" #include "checks.hpp" -#include "registered_memory.hpp" -#include "npkit/npkit.h" #include "infiniband/verbs.h" +#include "npkit/npkit.h" +#include "registered_memory.hpp" namespace mscclpp { -void validateTransport(RegisteredMemory mem, Transport transport) { +void validateTransport(RegisteredMemory mem, Transport transport) +{ if (!mem.transports().has(transport)) { throw std::runtime_error("mem does not support transport"); } @@ -14,29 +15,36 @@ void validateTransport(RegisteredMemory mem, Transport transport) { // Connection -std::shared_ptr Connection::getRegisteredMemoryImpl(RegisteredMemory& mem) { +std::shared_ptr Connection::getRegisteredMemoryImpl(RegisteredMemory& mem) +{ return mem.pimpl; } // CudaIpcConnection -CudaIpcConnection::CudaIpcConnection() { +CudaIpcConnection::CudaIpcConnection() +{ cudaStreamCreate(&stream); } -CudaIpcConnection::~CudaIpcConnection() { +CudaIpcConnection::~CudaIpcConnection() +{ cudaStreamDestroy(stream); } -Transport CudaIpcConnection::transport() { +Transport CudaIpcConnection::transport() +{ return Transport::CudaIpc; } -Transport CudaIpcConnection::remoteTransport() { +Transport CudaIpcConnection::remoteTransport() +{ return Transport::CudaIpc; } -void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) { +void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, + uint64_t size) +{ validateTransport(dst, remoteTransport()); validateTransport(src, transport()); @@ -47,30 +55,38 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register // npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size); } -void CudaIpcConnection::flush() { +void CudaIpcConnection::flush() +{ CUDATHROW(cudaStreamSynchronize(stream)); // npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT); } // IBConnection -IBConnection::IBConnection(int remoteRank, int tag, Transport transport, Communicator::Impl& commImpl) : remoteRank_(remoteRank), tag_(tag), transport_(transport), remoteTransport_(Transport::Unknown) { +IBConnection::IBConnection(int remoteRank, int tag, Transport transport, Communicator::Impl& commImpl) + : remoteRank_(remoteRank), tag_(tag), transport_(transport), remoteTransport_(Transport::Unknown) +{ qp = commImpl.getIbContext(transport)->createQp(); } -IBConnection::~IBConnection() { +IBConnection::~IBConnection() +{ // TODO: Destroy QP? } -Transport IBConnection::transport() { +Transport IBConnection::transport() +{ return transport_; } -Transport IBConnection::remoteTransport() { +Transport IBConnection::remoteTransport() +{ return remoteTransport_; } -void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) { +void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, + uint64_t size) +{ validateTransport(dst, remoteTransport()); validateTransport(src, transport()); @@ -82,16 +98,18 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem if (!srcTransportInfo.ibLocal) { throw std::runtime_error("src is remote, which is not supported"); } - + auto dstMrInfo = dstTransportInfo.ibMrInfo; auto srcMr = srcTransportInfo.ibMr; - qp->stageSend(srcMr, dstMrInfo, (uint32_t)size, /*wrId=*/0, /*srcOffset=*/srcOffset, /*dstOffset=*/dstOffset, /*signaled=*/false); + qp->stageSend(srcMr, dstMrInfo, (uint32_t)size, /*wrId=*/0, /*srcOffset=*/srcOffset, /*dstOffset=*/dstOffset, + /*signaled=*/false); qp->postSend(); // npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size); } -void IBConnection::flush() { +void IBConnection::flush() +{ bool isWaiting = true; while (isWaiting) { int wcNum = qp->pollCq(); @@ -114,11 +132,13 @@ void IBConnection::flush() { // npkitCollectExitEvents(conn, NPKIT_EVENT_IB_SEND_EXIT); } -void IBConnection::startSetup(std::shared_ptr bootstrap) { +void IBConnection::startSetup(std::shared_ptr bootstrap) +{ bootstrap->send(&qp->getInfo(), sizeof(qp->getInfo()), remoteRank_, tag_); } -void IBConnection::endSetup(std::shared_ptr bootstrap) { +void IBConnection::endSetup(std::shared_ptr bootstrap) +{ IbQpInfo qpInfo; bootstrap->recv(&qpInfo, sizeof(qpInfo), remoteRank_, tag_); qp->rtr(qpInfo); diff --git a/src/epoch.cc b/src/epoch.cc index 1fee307e..f6c82731 100644 --- a/src/epoch.cc +++ b/src/epoch.cc @@ -3,20 +3,25 @@ namespace mscclpp { -struct Epoch::Impl { +struct Epoch::Impl +{ DeviceEpoch deviceEpoch; - Impl() { + Impl() + { MSCCLPPTHROW(mscclppCudaCalloc(&deviceEpoch.localSignalEpochId, 1)); MSCCLPPTHROW(mscclppCudaCalloc(&deviceEpoch.waitEpochId, 1)); } - ~Impl() { + ~Impl() + { MSCCLPPTHROW(mscclppCudaFree(deviceEpoch.localSignalEpochId)); MSCCLPPTHROW(mscclppCudaFree(deviceEpoch.waitEpochId)); } }; -Epoch::Epoch() : pimpl(std::make_unique()) {} +Epoch::Epoch() : pimpl(std::make_unique()) +{ +} } // namespace mscclpp \ No newline at end of file diff --git a/src/fifo.cc b/src/fifo.cc index fe7f12d3..c2fdd738 100644 --- a/src/fifo.cc +++ b/src/fifo.cc @@ -1,13 +1,14 @@ -#include "mscclppfifo.hpp" #include "alloc.h" #include "checks.hpp" +#include "mscclppfifo.hpp" #include -#include #include +#include namespace mscclpp { -struct HostProxyFifo::Impl { +struct HostProxyFifo::Impl +{ DeviceProxyFifo deviceFifo; // allocated on the host. Only accessed by the host. This is a copy of the @@ -23,7 +24,8 @@ struct HostProxyFifo::Impl { cudaStream_t stream; }; -HostProxyFifo::HostProxyFifo() { +HostProxyFifo::HostProxyFifo() +{ pimpl = std::make_unique(); MSCCLPPTHROW(mscclppCudaCalloc(&pimpl->deviceFifo.head, 1)); MSCCLPPTHROW(mscclppCudaHostCalloc(&pimpl->deviceFifo.triggers, MSCCLPP_PROXY_FIFO_SIZE)); @@ -32,35 +34,40 @@ HostProxyFifo::HostProxyFifo() { pimpl->hostTail = 0; } -HostProxyFifo::~HostProxyFifo() { +HostProxyFifo::~HostProxyFifo() +{ MSCCLPPTHROW(mscclppCudaFree(pimpl->deviceFifo.head)); MSCCLPPTHROW(mscclppCudaHostFree(pimpl->deviceFifo.triggers)); MSCCLPPTHROW(mscclppCudaFree(pimpl->deviceFifo.tailReplica)); CUDATHROW(cudaStreamDestroy(pimpl->stream)); } -void HostProxyFifo::poll(ProxyTrigger *trigger) { +void HostProxyFifo::poll(ProxyTrigger* trigger) +{ __m128i xmm0 = _mm_load_si128((__m128i*)&pimpl->deviceFifo.triggers[pimpl->hostTail % MSCCLPP_PROXY_FIFO_SIZE]); _mm_store_si128((__m128i*)trigger, xmm0); } -void HostProxyFifo::pop() { +void HostProxyFifo::pop() +{ *(volatile uint64_t*)(&pimpl->deviceFifo.triggers[pimpl->hostTail % MSCCLPP_PROXY_FIFO_SIZE]) = 0; (pimpl->hostTail)++; } -void HostProxyFifo::flushTail(bool sync) { +void HostProxyFifo::flushTail(bool sync) +{ // Flush the tail to device memory. This is either triggered every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER to make sure // that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush // request. - CUDATHROW( - cudaMemcpyAsync(pimpl->deviceFifo.tailReplica, &pimpl->hostTail, sizeof(uint64_t), cudaMemcpyHostToDevice, pimpl->stream)); + CUDATHROW(cudaMemcpyAsync(pimpl->deviceFifo.tailReplica, &pimpl->hostTail, sizeof(uint64_t), cudaMemcpyHostToDevice, + pimpl->stream)); if (sync) { CUDATHROW(cudaStreamSynchronize(pimpl->stream)); } } -DeviceProxyFifo HostProxyFifo::toDevice() { +DeviceProxyFifo HostProxyFifo::toDevice() +{ return pimpl->deviceFifo; } diff --git a/src/host_connection.cc b/src/host_connection.cc index 72e11ffc..e33069e2 100644 --- a/src/host_connection.cc +++ b/src/host_connection.cc @@ -1,52 +1,64 @@ #include "host_connection.hpp" -#include "communicator.hpp" +#include "api.h" #include "comm.h" +#include "communicator.hpp" #include "mscclpp.h" #include "mscclppfifo.h" -#include "api.h" namespace mscclpp { -HostConnection::Impl::Impl(Communicator* comm, mscclppConn* conn) : comm(comm), conn(conn) { +HostConnection::Impl::Impl(Communicator* comm, mscclppConn* conn) : comm(comm), conn(conn) +{ this->hostConn = conn->hostConn; } -HostConnection::Impl::~Impl() { +HostConnection::Impl::~Impl() +{ // TODO: figure out memory ownership. Does this deallocate the mscclppHostConn? Likely not. } MSCCLPP_API_CPP HostConnection::~HostConnection() = default; -MSCCLPP_API_CPP HostConnection::HostConnection(std::unique_ptr p) : pimpl(std::move(p)) {} +MSCCLPP_API_CPP HostConnection::HostConnection(std::unique_ptr p) : pimpl(std::move(p)) +{ +} -MSCCLPP_API_CPP int HostConnection::getId() { +MSCCLPP_API_CPP int HostConnection::getId() +{ return pimpl->conn->connId; } -MSCCLPP_API_CPP BufferHandle HostConnection::registerBuffer(void* data, uint64_t size) { +MSCCLPP_API_CPP BufferHandle HostConnection::registerBuffer(void* data, uint64_t size) +{ BufferHandle result; static_assert(sizeof(BufferHandle) == sizeof(mscclppBufferHandle_t)); - mscclppRegisterBufferForConnection(pimpl->comm->pimpl->comm, pimpl->conn->connId, data, size, reinterpret_cast(&result)); + mscclppRegisterBufferForConnection(pimpl->comm->pimpl->comm, pimpl->conn->connId, data, size, + reinterpret_cast(&result)); return result; } -MSCCLPP_API_CPP int HostConnection::numLocalBuffers() { +MSCCLPP_API_CPP int HostConnection::numLocalBuffers() +{ return pimpl->conn->bufferRegistrations.size() - 1; } -MSCCLPP_API_CPP BufferHandle HostConnection::getLocalBuffer(int index) { +MSCCLPP_API_CPP BufferHandle HostConnection::getLocalBuffer(int index) +{ return index + 1; } -MSCCLPP_API_CPP int HostConnection::numRemoteBuffers() { +MSCCLPP_API_CPP int HostConnection::numRemoteBuffers() +{ return pimpl->conn->remoteBufferRegistrations.size() - 1; } -MSCCLPP_API_CPP BufferHandle HostConnection::getRemoteBuffer(int index) { +MSCCLPP_API_CPP BufferHandle HostConnection::getRemoteBuffer(int index) +{ return index + 1; } -MSCCLPP_API_CPP ConnectionEpoch HostConnection::getEpoch() { +MSCCLPP_API_CPP ConnectionEpoch HostConnection::getEpoch() +{ ConnectionEpoch epoch; static_assert(sizeof(SignalEpochId) == sizeof(mscclppDevConnSignalEpochId)); epoch.localSignalEpochId = reinterpret_cast(pimpl->conn->devConn->localSignalEpochId); @@ -55,24 +67,29 @@ MSCCLPP_API_CPP ConnectionEpoch HostConnection::getEpoch() { return epoch; } - -MSCCLPP_API_CPP DeviceProxyFifo HostConnection::getDeviceFifo() { +MSCCLPP_API_CPP DeviceProxyFifo HostConnection::getDeviceFifo() +{ return pimpl->comm->pimpl->proxy.fifo().toDevice(); } -MSCCLPP_API_CPP void HostConnection::put(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size) { +MSCCLPP_API_CPP void HostConnection::put(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, + uint64_t size) +{ pimpl->hostConn->put(dst, dstOffset, src, srcOffset, size); } -MSCCLPP_API_CPP void HostConnection::signal() { +MSCCLPP_API_CPP void HostConnection::signal() +{ pimpl->hostConn->signal(); } -MSCCLPP_API_CPP void HostConnection::flush() { +MSCCLPP_API_CPP void HostConnection::flush() +{ pimpl->hostConn->flush(); } -MSCCLPP_API_CPP void HostConnection::wait() { +MSCCLPP_API_CPP void HostConnection::wait() +{ pimpl->hostConn->wait(); } diff --git a/src/ib.cc b/src/ib.cc index 88d14d8e..ec7e95f2 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -1,16 +1,16 @@ #include #include #include -#include #include +#include #include -#include "mscclpp.hpp" #include "alloc.h" +#include "checks.hpp" #include "comm.h" #include "debug.h" #include "ib.hpp" -#include "checks.hpp" +#include "mscclpp.hpp" #include #include @@ -28,7 +28,9 @@ IbMr::IbMr(void* pd, void* buff, std::size_t size) : buff(buff) uintptr_t addr = reinterpret_cast(buff) & -pageSize; std::size_t pages = (size + (reinterpret_cast(buff) - addr) + pageSize - 1) / pageSize; struct ibv_pd* _pd = reinterpret_cast(pd); - struct ibv_mr* _mr = ibv_reg_mr(_pd, reinterpret_cast(addr), pages * pageSize, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_RELAXED_ORDERING); + struct ibv_mr* _mr = + ibv_reg_mr(_pd, reinterpret_cast(addr), pages * pageSize, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_RELAXED_ORDERING); if (_mr == nullptr) { std::stringstream err; err << "ibv_reg_mr failed (errno " << errno << ")"; @@ -164,7 +166,9 @@ void IbQp::rtr(const IbQpInfo& info) qp_attr.ah_attr.sl = 0; qp_attr.ah_attr.src_path_bits = 0; qp_attr.ah_attr.port_num = info.port; - int ret = ibv_modify_qp(reinterpret_cast(this->qp), &qp_attr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER); + int ret = ibv_modify_qp(reinterpret_cast(this->qp), &qp_attr, + IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | + IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER); if (ret != 0) { std::stringstream err; err << "ibv_modify_qp failed (errno " << errno << ")"; @@ -182,7 +186,9 @@ void IbQp::rts() qp_attr.rnr_retry = 7; qp_attr.sq_psn = 0; qp_attr.max_rd_atomic = 1; - int ret = ibv_modify_qp(reinterpret_cast(this->qp), &qp_attr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC); + int ret = ibv_modify_qp(reinterpret_cast(this->qp), &qp_attr, + IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC); if (ret != 0) { std::stringstream err; err << "ibv_modify_qp failed (errno " << errno << ")"; @@ -190,7 +196,8 @@ void IbQp::rts() } } -int IbQp::stageSend(const IbMr *mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, bool signaled) +int IbQp::stageSend(const IbMr* mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, + uint64_t dstOffset, bool signaled) { if (this->wrn >= MSCCLPP_IB_MAX_SENDS) { return -1; @@ -219,7 +226,8 @@ int IbQp::stageSend(const IbMr *mr, const IbMrInfo& info, uint32_t size, uint64_ return this->wrn; } -int IbQp::stageSendWithImm(const IbMr *mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, bool signaled, unsigned int immData) +int IbQp::stageSendWithImm(const IbMr* mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, + uint64_t dstOffset, bool signaled, unsigned int immData) { int wrn = this->stageSend(mr, info, size, wrId, srcOffset, dstOffset, signaled); struct ibv_send_wr* wrs_ = reinterpret_cast(this->wrs); @@ -234,7 +242,8 @@ void IbQp::postSend() return; } struct ibv_send_wr* bad_wr; - int ret = ibv_post_send(reinterpret_cast(this->qp), reinterpret_cast(this->wrs), &bad_wr); + int ret = ibv_post_send(reinterpret_cast(this->qp), reinterpret_cast(this->wrs), + &bad_wr); if (ret != 0) { std::stringstream err; err << "ibv_post_send failed (errno " << errno << ")"; @@ -260,7 +269,8 @@ void IbQp::postRecv(uint64_t wrId) int IbQp::pollCq() { - return ibv_poll_cq(reinterpret_cast(this->cq), MSCCLPP_IB_CQ_POLL_NUM, reinterpret_cast(this->wcs)); + return ibv_poll_cq(reinterpret_cast(this->cq), MSCCLPP_IB_CQ_POLL_NUM, + reinterpret_cast(this->wcs)); } IbQpInfo& IbQp::getInfo() @@ -317,8 +327,8 @@ bool IbCtx::isPortUsable(int port) const err << "ibv_query_port failed (errno " << errno << ", port << " << port << ")"; throw std::runtime_error(err.str()); } - return portAttr.state == IBV_PORT_ACTIVE && (portAttr.link_layer == IBV_LINK_LAYER_ETHERNET || - portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND); + return portAttr.state == IBV_PORT_ACTIVE && + (portAttr.link_layer == IBV_LINK_LAYER_ETHERNET || portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND); } int IbCtx::getAnyActivePort() const @@ -362,43 +372,45 @@ const std::string& IbCtx::getDevName() const return this->devName; } -int getIBDeviceCount() { +int getIBDeviceCount() +{ int num; ibv_get_device_list(&num); return num; } -std::string getIBDeviceName(Transport ibTransport) { +std::string getIBDeviceName(Transport ibTransport) +{ int num; struct ibv_device** devices = ibv_get_device_list(&num); int ibTransportIndex; switch (ibTransport) { // TODO: get rid of this ugly switch - case Transport::IB0: - ibTransportIndex = 0; - break; - case Transport::IB1: - ibTransportIndex = 1; - break; - case Transport::IB2: - ibTransportIndex = 2; - break; - case Transport::IB3: - ibTransportIndex = 3; - break; - case Transport::IB4: - ibTransportIndex = 4; - break; - case Transport::IB5: - ibTransportIndex = 5; - break; - case Transport::IB6: - ibTransportIndex = 6; - break; - case Transport::IB7: - ibTransportIndex = 7; - break; - default: - throw std::runtime_error("Not an IB transport"); + case Transport::IB0: + ibTransportIndex = 0; + break; + case Transport::IB1: + ibTransportIndex = 1; + break; + case Transport::IB2: + ibTransportIndex = 2; + break; + case Transport::IB3: + ibTransportIndex = 3; + break; + case Transport::IB4: + ibTransportIndex = 4; + break; + case Transport::IB5: + ibTransportIndex = 5; + break; + case Transport::IB6: + ibTransportIndex = 6; + break; + case Transport::IB7: + ibTransportIndex = 7; + break; + default: + throw std::runtime_error("Not an IB transport"); } if (ibTransportIndex >= num) { throw std::runtime_error("IB transport out of range"); @@ -406,30 +418,31 @@ std::string getIBDeviceName(Transport ibTransport) { return devices[ibTransportIndex]->name; } -Transport getIBTransportByDeviceName(const std::string& ibDeviceName) { +Transport getIBTransportByDeviceName(const std::string& ibDeviceName) +{ int num; struct ibv_device** devices = ibv_get_device_list(&num); for (int i = 0; i < num; ++i) { if (ibDeviceName == devices[i]->name) { switch (i) { // TODO: get rid of this ugly switch - case 0: - return Transport::IB0; - case 1: - return Transport::IB1; - case 2: - return Transport::IB2; - case 3: - return Transport::IB3; - case 4: - return Transport::IB4; - case 5: - return Transport::IB5; - case 6: - return Transport::IB6; - case 7: - return Transport::IB7; - default: - throw std::runtime_error("IB device index out of range"); + case 0: + return Transport::IB0; + case 1: + return Transport::IB1; + case 2: + return Transport::IB2; + case 3: + return Transport::IB3; + case 4: + return Transport::IB4; + case 5: + return Transport::IB5; + case 6: + return Transport::IB6; + case 7: + return Transport::IB7; + default: + throw std::runtime_error("IB device index out of range"); } } } diff --git a/src/include/basic_proxy_handler.hpp b/src/include/basic_proxy_handler.hpp index 1c4b3f86..58e41930 100644 --- a/src/include/basic_proxy_handler.hpp +++ b/src/include/basic_proxy_handler.hpp @@ -1,12 +1,12 @@ #ifndef MSCCLPP_BASIC_PROXY_SERVICE_HPP_ #define MSCCLPP_BASIC_PROXY_SERVICE_HPP_ -#include "mscclpp.hpp" #include "communicator.hpp" +#include "mscclpp.hpp" namespace mscclpp { -ProxyHandler makeBasicProxyHandler(Communicator::Impl &comm); +ProxyHandler makeBasicProxyHandler(Communicator::Impl& comm); } diff --git a/src/include/channel.hpp b/src/include/channel.hpp index 10a5f601..2303a57c 100644 --- a/src/include/channel.hpp +++ b/src/include/channel.hpp @@ -1,8 +1,8 @@ #ifndef MSCCLPP_CHANNEL_HPP_ #define MSCCLPP_CHANNEL_HPP_ -#include "mscclpp.hpp" #include "epoch.hpp" +#include "mscclpp.hpp" #include "proxy.hpp" namespace mscclpp { @@ -18,7 +18,7 @@ const ChannelTriggerType channelTriggerFlag = 0x2; const ChannelTriggerType channelTriggerSync = 0x4; // This is just a numeric ID. Each HostConnection will have an internal array indexed by these handles -// mapping to the actual +// mapping to the actual using BufferHandle = uint32_t; #define MSCCLPP_BITS_SIZE 32 @@ -43,20 +43,32 @@ union ChannelTrigger { uint64_t dstBufferHandle : MSCCLPP_BITS_BUFFER_HANDLE; uint64_t type : MSCCLPP_BITS_TYPE; uint64_t connId : MSCCLPP_BITS_CONNID; - uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_BUFFER_HANDLE - MSCCLPP_BITS_BUFFER_HANDLE - MSCCLPP_BITS_TYPE); // ensure 64-bit alignment + uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_BUFFER_HANDLE - MSCCLPP_BITS_BUFFER_HANDLE - + MSCCLPP_BITS_TYPE); // ensure 64-bit alignment } fields; #ifdef __CUDACC__ - __device__ ChannelTrigger() {} - __device__ ChannelTrigger(ProxyTrigger value) : value(value) {} - __device__ ChannelTrigger(ChannelTriggerType type, BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size, int connectionId) { + __device__ ChannelTrigger() + { + } + __device__ ChannelTrigger(ProxyTrigger value) : value(value) + { + } + __device__ ChannelTrigger(ChannelTriggerType type, BufferHandle dst, uint64_t dstOffset, BufferHandle src, + uint64_t srcOffset, uint64_t size, int connectionId) + { value.fst = ((srcOffset << MSCCLPP_BITS_SIZE) + size); - value.snd = ((((((((connectionId << MSCCLPP_BITS_TYPE) + (uint64_t)type) << MSCCLPP_BITS_BUFFER_HANDLE) + dst) << MSCCLPP_BITS_BUFFER_HANDLE) + src) << MSCCLPP_BITS_OFFSET) + dstOffset); + value.snd = ((((((((connectionId << MSCCLPP_BITS_TYPE) + (uint64_t)type) << MSCCLPP_BITS_BUFFER_HANDLE) + dst) + << MSCCLPP_BITS_BUFFER_HANDLE) + + src) + << MSCCLPP_BITS_OFFSET) + + dstOffset); } #endif // __CUDACC__ }; -struct ConnectionEpoch { +struct ConnectionEpoch +{ #ifdef __CUDACC__ __forceinline__ __device__ void wait() { @@ -81,8 +93,10 @@ struct ConnectionEpoch { uint64_t* waitEpochId; }; -class HostConnection { +class HostConnection +{ struct Impl; + public: /* HostConnection can not be constructed from user code and must instead be created through Communicator::connect */ HostConnection(std::unique_ptr); @@ -103,7 +117,7 @@ public: * * Inputs: * index: the index of the handle to get - * + * * Returns: a handle to the buffer */ BufferHandle getLocalBuffer(int index); @@ -118,7 +132,7 @@ public: * * Inputs: * index: the index of the handle to get - * + * * Returns: a handle to the buffer on the remote peer */ BufferHandle getRemoteBuffer(int index); @@ -140,19 +154,22 @@ private: friend class Communicator; }; -struct DeviceConnection { +struct DeviceConnection +{ DeviceConnection() = default; DeviceConnection(HostConnection& hostConn) - : connectionId(hostConn.getId()), epoch(hostConn.getEpoch()), - fifo(hostConn.getDeviceFifo()) {} + : connectionId(hostConn.getId()), epoch(hostConn.getEpoch()), fifo(hostConn.getDeviceFifo()) + { + } DeviceConnection(const DeviceConnection& other) = default; DeviceConnection& operator=(DeviceConnection& other) = default; #ifdef __CUDACC__ - __forceinline__ __device__ void put(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size) + __forceinline__ __device__ void put(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, + uint64_t size) { fifo.push(ChannelTrigger(channelTriggerData, dst, dstOffset, src, srcOffset, size, connectionId).value); } @@ -168,10 +185,13 @@ struct DeviceConnection { fifo.push(ChannelTrigger(channelTriggerFlag, 0, 0, 0, 0, 1, connectionId).value); } - __forceinline__ __device__ void putWithSignal(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size) + __forceinline__ __device__ void putWithSignal(BufferHandle dst, uint64_t dstOffset, BufferHandle src, + uint64_t srcOffset, uint64_t size) { epochIncrement(); - fifo.push(ChannelTrigger(channelTriggerData | channelTriggerFlag, dst, dstOffset, src, srcOffset, size, connectionId).value); + fifo.push( + ChannelTrigger(channelTriggerData | channelTriggerFlag, dst, dstOffset, src, srcOffset, size, connectionId) + .value); } __forceinline__ __device__ void putWithSignal(BufferHandle dst, BufferHandle src, uint64_t offset, uint64_t size) @@ -179,16 +199,20 @@ struct DeviceConnection { putWithSignal(dst, offset, src, offset, size); } - __forceinline__ __device__ void putWithSignalAndFlush(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size) + __forceinline__ __device__ void putWithSignalAndFlush(BufferHandle dst, uint64_t dstOffset, BufferHandle src, + uint64_t srcOffset, uint64_t size) { epochIncrement(); - uint64_t curFifoHead = fifo.push(ChannelTrigger(channelTriggerData | channelTriggerFlag | channelTriggerSync, dst, dstOffset, src, srcOffset, size, connectionId).value); + uint64_t curFifoHead = fifo.push(ChannelTrigger(channelTriggerData | channelTriggerFlag | channelTriggerSync, dst, + dstOffset, src, srcOffset, size, connectionId) + .value); while (*(volatile uint64_t*)&fifo.triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 && *(volatile uint64_t*)fifo.tailReplica <= curFifoHead) ; } - __forceinline__ __device__ void putWithSignalAndFlush(BufferHandle dst, BufferHandle src, uint64_t offset, uint64_t size) + __forceinline__ __device__ void putWithSignalAndFlush(BufferHandle dst, BufferHandle src, uint64_t offset, + uint64_t size) { putWithSignalAndFlush(dst, offset, src, offset, size); } @@ -223,10 +247,12 @@ struct DeviceConnection { DeviceProxyFifo fifo; }; -struct SimpleDeviceConnection { +struct SimpleDeviceConnection +{ SimpleDeviceConnection() = default; - SimpleDeviceConnection(HostConnection& hostConn) : devConn(hostConn) { + SimpleDeviceConnection(HostConnection& hostConn) : devConn(hostConn) + { dst = hostConn.getRemoteBuffer(0); src = hostConn.getLocalBuffer(0); } diff --git a/src/include/checks.hpp b/src/include/checks.hpp index ad985e76..69b222ee 100644 --- a/src/include/checks.hpp +++ b/src/include/checks.hpp @@ -8,6 +8,7 @@ #define MSCCLPP_CHECKS_HPP_ #include "debug.h" +#include #include #define MSCCLPPTHROW(call) \ @@ -26,4 +27,14 @@ } \ } while (false) +#define CUTHROW(cmd) \ + do { \ + CUresult err = cmd; \ + if (err != CUDA_SUCCESS) { \ + const char* errStr; \ + cuGetErrorString(err, &errStr); \ + throw std::runtime_error(std::string("Cu failure '") + std::string(errStr) + "'"); \ + } \ + } while (false) + #endif diff --git a/src/include/comm.h b/src/include/comm.h index dce724fa..e6a067d6 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -9,14 +9,14 @@ #include "ib.hpp" #include "proxy.h" -#include #include +#include #define MAXCONNECTIONS 64 struct mscclppBufferRegistration { - void *data; + void* data; uint64_t size; }; diff --git a/src/include/communicator.hpp b/src/include/communicator.hpp index e8e274b9..25fface7 100644 --- a/src/include/communicator.hpp +++ b/src/include/communicator.hpp @@ -1,19 +1,20 @@ #ifndef MSCCL_COMMUNICATOR_HPP_ #define MSCCL_COMMUNICATOR_HPP_ -#include "mscclpp.hpp" -#include "mscclpp.h" #include "channel.hpp" -#include "proxy.hpp" #include "ib.hpp" -#include +#include "mscclpp.h" +#include "mscclpp.hpp" +#include "proxy.hpp" #include +#include namespace mscclpp { class ConnectionBase; -struct Communicator::Impl { +struct Communicator::Impl +{ mscclppComm_t comm; std::vector> connections; std::unordered_map> ibContexts; diff --git a/src/include/connection.hpp b/src/include/connection.hpp index bd08802c..f957c8a1 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -1,25 +1,27 @@ #ifndef MSCCLPP_CONNECTION_HPP_ #define MSCCLPP_CONNECTION_HPP_ +#include "communicator.hpp" +#include "ib.hpp" #include "mscclpp.hpp" #include -#include "ib.hpp" -#include "communicator.hpp" namespace mscclpp { // TODO: Add functionality to these classes for Communicator to do connectionSetup -class ConnectionBase : public Connection { +class ConnectionBase : public Connection +{ public: - virtual void startSetup(std::shared_ptr bootstrap) {}; - virtual void endSetup(std::shared_ptr bootstrap) {}; + virtual void startSetup(std::shared_ptr bootstrap){}; + virtual void endSetup(std::shared_ptr bootstrap){}; }; -class CudaIpcConnection : public ConnectionBase { +class CudaIpcConnection : public ConnectionBase +{ cudaStream_t stream; -public: +public: CudaIpcConnection(); ~CudaIpcConnection(); @@ -28,19 +30,21 @@ public: Transport remoteTransport() override; - void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; + void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, + uint64_t size) override; void flush() override; }; -class IBConnection : public ConnectionBase { +class IBConnection : public ConnectionBase +{ int remoteRank_; int tag_; Transport transport_; Transport remoteTransport_; IbQp* qp; -public: +public: IBConnection(int remoteRank, int tag, Transport transport, Communicator::Impl& commImpl); ~IBConnection(); @@ -49,7 +53,8 @@ public: Transport remoteTransport() override; - void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; + void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, + uint64_t size) override; void flush() override; diff --git a/src/include/epoch.hpp b/src/include/epoch.hpp index 942edd8b..fd25b51f 100644 --- a/src/include/epoch.hpp +++ b/src/include/epoch.hpp @@ -5,7 +5,8 @@ namespace mscclpp { -struct alignas(16) SignalEpochId { +struct alignas(16) SignalEpochId +{ // every signal(), increaments this and either: // 1) proxy thread pushes it to the remote peer's localSignalEpochId->proxy // 2) gpu thread directly writes it to remoteSignalEpochId->device @@ -14,7 +15,8 @@ struct alignas(16) SignalEpochId { uint64_t proxy; }; -struct DeviceEpoch { +struct DeviceEpoch +{ #ifdef __CUDACC__ __forceinline__ __device__ void wait() { @@ -34,10 +36,11 @@ struct DeviceEpoch { uint64_t* waitEpochId; }; - -class Epoch { +class Epoch +{ struct Impl; std::unique_ptr pimpl; + public: Epoch(); ~Epoch(); diff --git a/src/include/host_connection.hpp b/src/include/host_connection.hpp index 495130d9..8ac5d9f1 100644 --- a/src/include/host_connection.hpp +++ b/src/include/host_connection.hpp @@ -1,13 +1,14 @@ #ifndef MSCCLPP_HOST_CONNECTION_HPP_ #define MSCCLPP_HOST_CONNECTION_HPP_ -#include "mscclpp.hpp" -#include "mscclpp.h" #include "comm.h" +#include "mscclpp.h" +#include "mscclpp.hpp" namespace mscclpp { -struct HostConnection::Impl { +struct HostConnection::Impl +{ Communicator* comm; mscclppConn* conn; mscclppHostConn_t* hostConn; diff --git a/src/include/ib.hpp b/src/include/ib.hpp index b1baeb75..78d31ce6 100644 --- a/src/include/ib.hpp +++ b/src/include/ib.hpp @@ -1,9 +1,9 @@ #ifndef MSCCLPP_IB_HPP_ #define MSCCLPP_IB_HPP_ +#include #include #include -#include #define MSCCLPP_IB_CQ_SIZE 1024 #define MSCCLPP_IB_CQ_POLL_NUM 1 @@ -55,8 +55,10 @@ public: void rtr(const IbQpInfo& info); void rts(); - int stageSend(const IbMr* mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, bool signaled); - int stageSendWithImm(const IbMr* mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, uint64_t dstOffset, bool signaled, unsigned int immData); + int stageSend(const IbMr* mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, + uint64_t dstOffset, bool signaled); + int stageSendWithImm(const IbMr* mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, + uint64_t dstOffset, bool signaled, unsigned int immData); void postSend(); void postRecv(uint64_t wrId); int pollCq(); diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index c01246ab..4789b80f 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -191,7 +191,8 @@ struct mscclppHostConn { virtual ~mscclppHostConn() = default; virtual void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) = 0; - virtual void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset, uint64_t dataSize) = 0; + virtual void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset, + uint64_t dataSize) = 0; virtual void signal() = 0; virtual void wait() = 0; virtual void flush() = 0; @@ -232,7 +233,6 @@ typedef enum mscclppNumResults = 8 } mscclppResult_t; - /* Create a unique ID for communication. Only needs to be called by one process. * Use with mscclppCommInitRankFromId(). * All processes need to provide the same ID to mscclppCommInitRankFromId(). @@ -343,7 +343,8 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void * transportType: the type of transport to be used (mscclppTransportP2P or mscclppTransportIB) * ibDev: the name of the IB device to be used. Expects a null for mscclppTransportP2P. */ -mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int remoteRank, int tag, mscclppTransport_t transportType, const char* ibDev = 0); +mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int remoteRank, int tag, + mscclppTransport_t transportType, const char* ibDev = 0); /* Register a buffer for use with a connection. * @@ -356,7 +357,8 @@ mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int remoteRank, * Outputs: * handle: a handle to the buffer registration */ -mscclppResult_t mscclppRegisterBufferForConnection(mscclppComm_t comm, int connIdx, void* localBuff, uint64_t buffSize, mscclppBufferHandle_t *handle); +mscclppResult_t mscclppRegisterBufferForConnection(mscclppComm_t comm, int connIdx, void* localBuff, uint64_t buffSize, + mscclppBufferHandle_t* handle); /* Establish all connections declared by mscclppConnect(). This function must be called after all mscclppConnect() * calls are made. This function ensures that all remote ranks are ready to communicate when it returns. diff --git a/src/include/mscclpp.hpp b/src/include/mscclpp.hpp index 3b9c6d8d..8a85ebc6 100644 --- a/src/include/mscclpp.hpp +++ b/src/include/mscclpp.hpp @@ -6,16 +6,16 @@ #define MSCCLPP_PATCH 0 #define MSCCLPP_VERSION (MSCCLPP_MAJOR * 10000 + MSCCLPP_MINOR * 100 + MSCCLPP_PATCH) -#include +#include #include #include -#include - +#include namespace mscclpp { #define MSCCLPP_UNIQUE_ID_BYTES 128 -struct UniqueId { +struct UniqueId +{ char internal[MSCCLPP_UNIQUE_ID_BYTES]; }; @@ -64,7 +64,8 @@ private: */ std::unique_ptr getUniqueId(); -enum class Transport { +enum class Transport +{ Unknown, CudaIpc, IB0, @@ -79,109 +80,137 @@ enum class Transport { }; namespace detail { - const size_t TransportFlagsSize = 10; - static_assert(TransportFlagsSize == static_cast(Transport::NumTransports), "TransportFlagsSize must match the number of transports"); - using TransportFlagsBase = std::bitset; -} +const size_t TransportFlagsSize = 10; +static_assert(TransportFlagsSize == static_cast(Transport::NumTransports), + "TransportFlagsSize must match the number of transports"); +using TransportFlagsBase = std::bitset; +} // namespace detail -class TransportFlags : private detail::TransportFlagsBase { +class TransportFlags : private detail::TransportFlagsBase +{ public: TransportFlags() = default; - TransportFlags(Transport transport) : detail::TransportFlagsBase(1 << static_cast(transport)) {} + TransportFlags(Transport transport) : detail::TransportFlagsBase(1 << static_cast(transport)) + { + } - bool has(Transport transport) const { + bool has(Transport transport) const + { return detail::TransportFlagsBase::test(static_cast(transport)); } - bool none() const { + bool none() const + { return detail::TransportFlagsBase::none(); } - bool any() const { + bool any() const + { return detail::TransportFlagsBase::any(); } - bool all() const { + bool all() const + { return detail::TransportFlagsBase::all(); } - size_t count() const { + size_t count() const + { return detail::TransportFlagsBase::count(); } - TransportFlags& operator|=(TransportFlags other) { + TransportFlags& operator|=(TransportFlags other) + { detail::TransportFlagsBase::operator|=(other); return *this; } - TransportFlags operator|(TransportFlags other) const { + TransportFlags operator|(TransportFlags other) const + { return TransportFlags(*this) |= other; } - TransportFlags operator|(Transport transport) const { + TransportFlags operator|(Transport transport) const + { return *this | TransportFlags(transport); } - TransportFlags& operator&=(TransportFlags other) { + TransportFlags& operator&=(TransportFlags other) + { detail::TransportFlagsBase::operator&=(other); return *this; } - TransportFlags operator&(TransportFlags other) const { + TransportFlags operator&(TransportFlags other) const + { return TransportFlags(*this) &= other; } - TransportFlags operator&(Transport transport) const { + TransportFlags operator&(Transport transport) const + { return *this & TransportFlags(transport); } - TransportFlags& operator^=(TransportFlags other) { + TransportFlags& operator^=(TransportFlags other) + { detail::TransportFlagsBase::operator^=(other); return *this; } - TransportFlags operator^(TransportFlags other) const { + TransportFlags operator^(TransportFlags other) const + { return TransportFlags(*this) ^= other; } - TransportFlags operator^(Transport transport) const { + TransportFlags operator^(Transport transport) const + { return *this ^ TransportFlags(transport); } - TransportFlags operator~() const { + TransportFlags operator~() const + { return TransportFlags(*this).flip(); } - bool operator==(TransportFlags other) const { + bool operator==(TransportFlags other) const + { return detail::TransportFlagsBase::operator==(other); } - bool operator!=(TransportFlags other) const { + bool operator!=(TransportFlags other) const + { return detail::TransportFlagsBase::operator!=(other); } - detail::TransportFlagsBase toBitset() const { + detail::TransportFlagsBase toBitset() const + { return *this; } private: - TransportFlags(detail::TransportFlagsBase bitset) : detail::TransportFlagsBase(bitset) {} + TransportFlags(detail::TransportFlagsBase bitset) : detail::TransportFlagsBase(bitset) + { + } }; -inline TransportFlags operator|(Transport transport1, Transport transport2) { +inline TransportFlags operator|(Transport transport1, Transport transport2) +{ return TransportFlags(transport1) | transport2; } -inline TransportFlags operator&(Transport transport1, Transport transport2) { +inline TransportFlags operator&(Transport transport1, Transport transport2) +{ return TransportFlags(transport1) & transport2; } -inline TransportFlags operator^(Transport transport1, Transport transport2) { +inline TransportFlags operator^(Transport transport1, Transport transport2) +{ return TransportFlags(transport1) ^ transport2; } const TransportFlags NoTransports = TransportFlags(); -const TransportFlags AllIBTransports = Transport::IB0 | Transport::IB1 | Transport::IB2 | Transport::IB3 | Transport::IB4 | Transport::IB5 | Transport::IB6 | Transport::IB7; +const TransportFlags AllIBTransports = Transport::IB0 | Transport::IB1 | Transport::IB2 | Transport::IB3 | + Transport::IB4 | Transport::IB5 | Transport::IB6 | Transport::IB7; const TransportFlags AllTransports = AllIBTransports | Transport::CudaIpc; int getIBDeviceCount(); @@ -191,11 +220,12 @@ Transport getIBTransportByDeviceName(const std::string& ibDeviceName); class Communicator; class Connection; -class RegisteredMemory { +class RegisteredMemory +{ struct Impl; std::shared_ptr pimpl; -public: +public: RegisteredMemory(std::shared_ptr pimpl); ~RegisteredMemory(); @@ -211,9 +241,11 @@ public: friend class Communicator; }; -class Connection { +class Connection +{ public: - virtual void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) = 0; + virtual void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, + uint64_t size) = 0; virtual void flush() = 0; @@ -225,24 +257,24 @@ protected: static std::shared_ptr getRegisteredMemoryImpl(RegisteredMemory&); }; -class Communicator { +class Communicator +{ public: /* Initialize the communicator. - * - * Inputs: - * bootstrap: an implementation of the of BaseBootstrap that the communicator will use - */ + * + * Inputs: + * bootstrap: an implementation of the of BaseBootstrap that the communicator will use + */ Communicator(std::shared_ptr bootstrap); - ~Communicator(); - + /* Ring-based AllGather through the bootstrap socket. - * - * Inputs: - * data: data array to be gathered where `[r*size, (r+1)*size)` is the data for rank `r` - * size: data size per rank - */ + * + * Inputs: + * data: data array to be gathered where `[r*size, (r+1)*size)` is the data for rank `r` + * size: data size per rank + */ void bootstrapAllGather(void* data, int size); /* A no-op function that is used to synchronize all processes via a bootstrap allgather*/ @@ -253,33 +285,34 @@ public: * Inputs: * data: base pointer to the memory * size: size of the memory region in bytes - * + * * Returns: a handle to the buffer */ RegisteredMemory registerMemory(void* ptr, size_t size, TransportFlags transports); /* Connect to a remote rank. This function only prepares metadata for connection. The actual connection - * is made by a following call of mscclppConnectionSetup(). Note that this function is two-way and a connection - * from rank i to remote rank j needs to have a counterpart from rank j to rank i. - * Note that with IB, buffers are registered at a page level and if a buffer is spread through multiple pages - * and do not fully utilize all of them, IB's QP has to register for all involved pages. This potentially has - * security risks if the devConn's accesses are given to a malicious process. - * - * Inputs: - * remoteRank: the rank of the remote process - * tag: the tag of the connection. tag is copied into the corresponding mscclppDevConn_t, which can be - * used to identify the connection inside a GPU kernel. - * transportType: the type of transport to be used (mscclppTransportP2P or mscclppTransportIB) - * ibDev: the name of the IB device to be used. Expects a null for mscclppTransportP2P. - */ + * is made by a following call of mscclppConnectionSetup(). Note that this function is two-way and a connection + * from rank i to remote rank j needs to have a counterpart from rank j to rank i. + * Note that with IB, buffers are registered at a page level and if a buffer is spread through multiple pages + * and do not fully utilize all of them, IB's QP has to register for all involved pages. This potentially has + * security risks if the devConn's accesses are given to a malicious process. + * + * Inputs: + * remoteRank: the rank of the remote process + * tag: the tag of the connection. tag is copied into the corresponding mscclppDevConn_t, which can be + * used to identify the connection inside a GPU kernel. + * transportType: the type of transport to be used (mscclppTransportP2P or mscclppTransportIB) + * ibDev: the name of the IB device to be used. Expects a null for mscclppTransportP2P. + */ std::shared_ptr connect(int remoteRank, int tag, Transport transport); /* Establish all connections declared by connect(). This function must be called after all connect() - * calls are made. This function ensures that all remote ranks are ready to communicate when it returns. - */ + * calls are made. This function ensures that all remote ranks are ready to communicate when it returns. + */ void connectionSetup(); struct Impl; + private: std::unique_ptr pimpl; }; @@ -287,12 +320,13 @@ private: } // namespace mscclpp namespace std { - template <> - struct hash { - size_t operator()(const mscclpp::TransportFlags& flags) const { - return hash()(flags.toBitset()); - } - }; -} +template <> struct hash +{ + size_t operator()(const mscclpp::TransportFlags& flags) const + { + return hash()(flags.toBitset()); + } +}; +} // namespace std #endif // MSCCLPP_H_ diff --git a/src/include/mscclppfifo.hpp b/src/include/mscclppfifo.hpp index b5f8ba4c..7e2820b0 100644 --- a/src/include/mscclppfifo.hpp +++ b/src/include/mscclppfifo.hpp @@ -1,13 +1,14 @@ #ifndef MSCCLPPFIFO_HPP_ #define MSCCLPPFIFO_HPP_ -#include #include #include +#include namespace mscclpp { -struct alignas(16) ProxyTrigger { +struct alignas(16) ProxyTrigger +{ uint64_t fst, snd; }; @@ -24,7 +25,8 @@ struct alignas(16) ProxyTrigger { * Why duplicating the tail is a good idea? The fifo is large engouh and we do not need frequent updates * for the tail as there is usually enough space for device threads to push their work into. */ -struct DeviceProxyFifo { +struct DeviceProxyFifo +{ #ifdef __CUDACC__ __forceinline__ __device__ uint64_t push(ProxyTrigger trigger) { @@ -34,29 +36,28 @@ struct DeviceProxyFifo { while (*(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0) ; ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]); - asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), - "l"(trigger.fst), "l"(trigger.snd)); + asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd)); return curFifoHead; } #endif // __CUDACC__ ProxyTrigger* triggers; // Allocate on host via cudaHostAlloc. This space is used for pushing the workelements - uint64_t* tailReplica; // Allocated on device. proxyState->fifoTailHost is the true tail on host and pused - // occasionally to device - uint64_t* head; // Allocated on device. Only accessed by device + uint64_t* tailReplica; // Allocated on device. proxyState->fifoTailHost is the true tail on host and pused + // occasionally to device + uint64_t* head; // Allocated on device. Only accessed by device }; class HostProxyFifo { public: HostProxyFifo(); - + ~HostProxyFifo(); - void poll(ProxyTrigger *trigger); - + void poll(ProxyTrigger* trigger); + void pop(); - + void flushTail(bool sync = false); DeviceProxyFifo toDevice(); diff --git a/src/include/proxy.h b/src/include/proxy.h index 3746806b..5bcb7da5 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -60,7 +60,7 @@ struct mscclppProxyState int numaNodeToBind; mscclpp::IbCtx* ibContext; // For IB connection only - cudaStream_t p2pStream; // for P2P DMA engine only + cudaStream_t p2pStream; // for P2P DMA engine only struct mscclppProxyFifo fifo; }; diff --git a/src/include/proxy.hpp b/src/include/proxy.hpp index 70b6ba49..ac4116b3 100644 --- a/src/include/proxy.hpp +++ b/src/include/proxy.hpp @@ -3,12 +3,13 @@ #include -#include #include +#include namespace mscclpp { -enum class ProxyHandlerResult { +enum class ProxyHandlerResult +{ Continue, FlushFifoTailAndContinue, Stop, @@ -17,7 +18,8 @@ enum class ProxyHandlerResult { class Proxy; using ProxyHandler = std::function; -class Proxy { +class Proxy +{ public: Proxy(ProxyHandler handler); @@ -26,7 +28,7 @@ public: void start(); void stop(); - + HostProxyFifo& fifo(); private: diff --git a/src/include/registered_memory.hpp b/src/include/registered_memory.hpp index afe42da4..1c37ff04 100644 --- a/src/include/registered_memory.hpp +++ b/src/include/registered_memory.hpp @@ -1,15 +1,16 @@ #ifndef MSCCLPP_REGISTERED_MEMORY_HPP_ #define MSCCLPP_REGISTERED_MEMORY_HPP_ -#include "mscclpp.hpp" -#include "mscclpp.h" -#include "ib.hpp" #include "communicator.hpp" +#include "ib.hpp" +#include "mscclpp.h" +#include "mscclpp.hpp" #include namespace mscclpp { -struct TransportInfo { +struct TransportInfo +{ Transport transport; // TODO: rewrite this using std::variant or something @@ -21,7 +22,8 @@ struct TransportInfo { }; }; -struct RegisteredMemory::Impl { +struct RegisteredMemory::Impl +{ void* data; size_t size; int rank; @@ -31,7 +33,8 @@ struct RegisteredMemory::Impl { Impl(void* data, size_t size, int rank, TransportFlags transports, Communicator::Impl& commImpl); Impl(const std::vector& data); - TransportInfo& getTransportInfo(Transport transport) { + TransportInfo& getTransportInfo(Transport transport) + { for (auto& entry : transportInfos) { if (entry.transport == transport) { return entry; diff --git a/src/include/registered_ptr.hpp b/src/include/registered_ptr.hpp index 7eadb6b0..4f03ea40 100644 --- a/src/include/registered_ptr.hpp +++ b/src/include/registered_ptr.hpp @@ -3,32 +3,44 @@ namespace mscclpp { -template -class RegisteredPtr { +template class RegisteredPtr +{ RegisteredMemory memory; size_t offset; -public: - RegisteredPtr(RegisteredMemory memory, size_t offset) : memory(memory), offset(offset) {} - RegisteredPtr(RegisteredMemory memory) : RegisteredPtr(memory, 0) {} - ~RegisteredPtr() {} - RegisteredMemory memory() { +public: + RegisteredPtr(RegisteredMemory memory, size_t offset) : memory(memory), offset(offset) + { + } + RegisteredPtr(RegisteredMemory memory) : RegisteredPtr(memory, 0) + { + } + ~RegisteredPtr() + { + } + + RegisteredMemory memory() + { return memory; } - T* data() { + T* data() + { return reinterpret_cast(memory.data()); } - size_t size() { + size_t size() + { return memory.size() / sizeof(T); } - size_t offset() { + size_t offset() + { return offset; } - RegisteredPtr operator+(size_t offset) { + RegisteredPtr operator+(size_t offset) + { return RegisteredPtr(memory, this->offset + offset); } diff --git a/src/init.cc b/src/init.cc index c5b6a66b..03f037c4 100644 --- a/src/init.cc +++ b/src/init.cc @@ -6,8 +6,8 @@ #if defined(MSCCLPP_USE_GDRCOPY) #include "gdr.h" #endif -#include "mscclpp.h" #include "infiniband/verbs.h" +#include "mscclpp.h" #include #include #include @@ -327,7 +327,8 @@ struct mscclppHostP2PConn : mscclppHostConn { put(1, dstDataOffset, 1, srcDataOffset, dataSize); } - void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset, uint64_t dataSize) + void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset, + uint64_t dataSize) { void* srcBuff = (void*)((char*)conn->bufferRegistrations[src].data + srcDataOffset); void* dstBuff = (void*)((char*)conn->remoteBufferRegistrations[dst].data + dstDataOffset); @@ -365,7 +366,8 @@ struct mscclppHostIBConn : mscclppHostConn { put(1, dstDataOffset, 1, srcDataOffset, dataSize); } - void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset, uint64_t dataSize) + void put(mscclppBufferHandle_t dst, uint64_t dstDataOffset, mscclppBufferHandle_t src, uint64_t srcDataOffset, + uint64_t dataSize) { this->ibQp->stageSend(this->ibMrs[src], this->remoteIbMrInfos[dst], (uint32_t)dataSize, /*wrId=*/0, /*srcOffset=*/srcDataOffset, /*dstOffset=*/dstDataOffset, /*signaled=*/false); @@ -413,7 +415,8 @@ struct mscclppHostIBConn : mscclppHostConn std::vector remoteIbMrInfos; }; -MSCCLPP_API mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int remoteRank, int tag, mscclppTransport_t transportType, const char* ibDev) +MSCCLPP_API mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int remoteRank, int tag, + mscclppTransport_t transportType, const char* ibDev) { // save this processes numa binding and set it to the one closest to the device // so that all the allocation are close to the device @@ -550,7 +553,8 @@ MSCCLPP_API mscclppResult_t mscclppConnectWithoutBuffer(mscclppComm_t comm, int MSCCLPPCHECK(setNumaState(curProcessState)); mscclppBufferHandle_t signalHandle = -1; - MSCCLPPCHECK(mscclppRegisterBufferForConnection(comm, connId, conn->devConn->localSignalEpochId, sizeof(mscclppDevConnSignalEpochId), &signalHandle)); + MSCCLPPCHECK(mscclppRegisterBufferForConnection(comm, connId, conn->devConn->localSignalEpochId, + sizeof(mscclppDevConnSignalEpochId), &signalHandle)); if (signalHandle != 0) { WARN("signal handle should be 0"); return mscclppInternalError; @@ -579,7 +583,9 @@ MSCCLPP_API mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, i return mscclppSuccess; } -MSCCLPP_API mscclppResult_t mscclppRegisterBufferForConnection(mscclppComm_t comm, int connIdx, void* localBuff, uint64_t buffSize, mscclppBufferHandle_t *handle) { +MSCCLPP_API mscclppResult_t mscclppRegisterBufferForConnection(mscclppComm_t comm, int connIdx, void* localBuff, + uint64_t buffSize, mscclppBufferHandle_t* handle) +{ if (connIdx >= comm->nConns) { WARN("connIdx out of range"); return mscclppInvalidArgument; @@ -605,26 +611,31 @@ struct connInfo mscclpp::IbQpInfo infoQp; std::vector bufferInfos; - struct header { + struct header + { mscclpp::IbQpInfo infoQp; int numBufferInfos; }; - mscclppResult_t sendOverBootstrap(void* bootstrap, int remoteRank, int tag) { + mscclppResult_t sendOverBootstrap(void* bootstrap, int remoteRank, int tag) + { header h; h.infoQp = infoQp; h.numBufferInfos = bufferInfos.size(); MSCCLPPCHECK(bootstrapSend(bootstrap, remoteRank, tag, &h, sizeof(header))); - MSCCLPPCHECK(bootstrapSend(bootstrap, remoteRank, tag, bufferInfos.data(), bufferInfos.size() * sizeof(mscclppBufferRegistrationInfo))); + MSCCLPPCHECK(bootstrapSend(bootstrap, remoteRank, tag, bufferInfos.data(), + bufferInfos.size() * sizeof(mscclppBufferRegistrationInfo))); return mscclppSuccess; } - mscclppResult_t recvOverBootstrap(void* bootstrap, int remoteRank, int tag) { + mscclppResult_t recvOverBootstrap(void* bootstrap, int remoteRank, int tag) + { header h; MSCCLPPCHECK(bootstrapRecv(bootstrap, remoteRank, tag, &h, sizeof(header))); infoQp = h.infoQp; bufferInfos.resize(h.numBufferInfos); - MSCCLPPCHECK(bootstrapRecv(bootstrap, remoteRank, tag, bufferInfos.data(), bufferInfos.size() * sizeof(mscclppBufferRegistrationInfo))); + MSCCLPPCHECK(bootstrapRecv(bootstrap, remoteRank, tag, bufferInfos.data(), + bufferInfos.size() * sizeof(mscclppBufferRegistrationInfo))); return mscclppSuccess; } }; @@ -637,7 +648,7 @@ mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*input } // Add all registered buffers - for (const auto &bufReg : conn->bufferRegistrations) { + for (const auto& bufReg : conn->bufferRegistrations) { connInfo->bufferInfos.emplace_back(); CUDACHECK(cudaIpcGetMemHandle(&connInfo->bufferInfos.back().cudaHandle, bufReg.data)); connInfo->bufferInfos.back().size = bufReg.size; @@ -659,7 +670,8 @@ mscclppResult_t mscclppP2pConnectionSetupEnd(struct connInfo* connInfo /*input*/ // Open all remote registered buffers for (size_t i = 0; i < connInfo->bufferInfos.size(); i++) { mscclppBufferRegistration newBufReg; - CUDACHECK(cudaIpcOpenMemHandle(&newBufReg.data, connInfo->bufferInfos[i].cudaHandle, cudaIpcMemLazyEnablePeerAccess)); + CUDACHECK( + cudaIpcOpenMemHandle(&newBufReg.data, connInfo->bufferInfos[i].cudaHandle, cudaIpcMemLazyEnablePeerAccess)); newBufReg.size = connInfo->bufferInfos[i].size; conn->remoteBufferRegistrations.push_back(newBufReg); } @@ -670,8 +682,8 @@ mscclppResult_t mscclppP2pConnectionSetupEnd(struct connInfo* connInfo /*input*/ } conn->devConn->remoteSignalEpochId = (mscclppDevConnSignalEpochId*)conn->remoteBufferRegistrations[0].data; - // For backwards compatibility with the previous API that assumed one data buffer per connection, set the remote buffer - // to the first remote data buffer + // For backwards compatibility with the previous API that assumed one data buffer per connection, set the remote + // buffer to the first remote data buffer if (conn->remoteBufferRegistrations.size() > 1) { conn->devConn->remoteBuff = conn->remoteBufferRegistrations[1].data; } @@ -695,7 +707,7 @@ mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output } // Add all registered buffers - for (const auto &bufReg : conn->bufferRegistrations) { + for (const auto& bufReg : conn->bufferRegistrations) { hostConn->ibMrs.emplace_back(ibCtx->registerMr(bufReg.data, sizeof(struct mscclppDevConnSignalEpochId))); connInfo->bufferInfos.emplace_back(); connInfo->bufferInfos.back().ibMrInfo = hostConn->ibMrs.back()->getInfo(); @@ -743,7 +755,8 @@ MSCCLPP_API mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm) MSCCLPPCHECK(mscclppIbConnectionSetupStart(&cInfo, conn)); } // TODO: from saemal: do we possibly deadlock if there are too many outstanding sends? - // MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &cInfo, sizeof(cInfo))); + // MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &cInfo, + // sizeof(cInfo))); MSCCLPPCHECK(cInfo.sendOverBootstrap(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag)); } diff --git a/src/proxy_cpp.cc b/src/proxy_cpp.cc index 2d1cf098..b55d6995 100644 --- a/src/proxy_cpp.cc +++ b/src/proxy_cpp.cc @@ -1,8 +1,8 @@ +#include "api.h" #include "mscclpp.hpp" #include "utils.h" -#include "api.h" -#include #include +#include namespace mscclpp { @@ -10,26 +10,32 @@ const int ProxyStopCheckPeriod = 1000; const int ProxyFlushPeriod = 4; -struct Proxy::Impl { +struct Proxy::Impl +{ ProxyHandler handler; HostProxyFifo fifo; std::thread service; std::atomic_bool running; - Impl(ProxyHandler handler) : handler(handler), running(false) {} + Impl(ProxyHandler handler) : handler(handler), running(false) + { + } }; -MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler) { +MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler) +{ pimpl = std::make_unique(handler); } -MSCCLPP_API_CPP Proxy::~Proxy() { +MSCCLPP_API_CPP Proxy::~Proxy() +{ if (pimpl) { stop(); } } -MSCCLPP_API_CPP void Proxy::start() { +MSCCLPP_API_CPP void Proxy::start() +{ pimpl->running = true; pimpl->service = std::thread([this] { // from this point on, proxy thread will stay close to the device @@ -52,7 +58,7 @@ MSCCLPP_API_CPP void Proxy::start() { // Poll to see if we are ready to send anything fifo.poll(&trigger); if (trigger.fst == 0) { // TODO: this check is a potential pitfall for custom triggers - continue; // there is one in progress + continue; // there is one in progress } ProxyHandlerResult result = handler(trigger); @@ -83,14 +89,16 @@ MSCCLPP_API_CPP void Proxy::start() { }); } -MSCCLPP_API_CPP void Proxy::stop() { +MSCCLPP_API_CPP void Proxy::stop() +{ pimpl->running = false; if (pimpl->service.joinable()) { pimpl->service.join(); } } -MSCCLPP_API_CPP HostProxyFifo& Proxy::fifo() { +MSCCLPP_API_CPP HostProxyFifo& Proxy::fifo() +{ return pimpl->fifo; } diff --git a/src/registered_memory.cc b/src/registered_memory.cc index b26ea2d5..b9769dc9 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -1,10 +1,13 @@ #include "registered_memory.hpp" #include "checks.hpp" #include +#include namespace mscclpp { -RegisteredMemory::Impl::Impl(void* data, size_t size, int rank, TransportFlags transports, Communicator::Impl& commImpl) : data(data), size(size), rank(rank), transports(transports) { +RegisteredMemory::Impl::Impl(void* data, size_t size, int rank, TransportFlags transports, Communicator::Impl& commImpl) + : data(data), size(size), rank(rank), transports(transports) +{ if (transports.has(Transport::CudaIpc)) { TransportInfo transportInfo; transportInfo.transport = Transport::CudaIpc; @@ -23,38 +26,53 @@ RegisteredMemory::Impl::Impl(void* data, size_t size, int rank, TransportFlags t transportInfo.ibLocal = true; this->transportInfos.push_back(transportInfo); }; - if (transports.has(Transport::IB0)) addIb(Transport::IB0); - if (transports.has(Transport::IB1)) addIb(Transport::IB1); - if (transports.has(Transport::IB2)) addIb(Transport::IB2); - if (transports.has(Transport::IB3)) addIb(Transport::IB3); - if (transports.has(Transport::IB4)) addIb(Transport::IB4); - if (transports.has(Transport::IB5)) addIb(Transport::IB5); - if (transports.has(Transport::IB6)) addIb(Transport::IB6); - if (transports.has(Transport::IB7)) addIb(Transport::IB7); + if (transports.has(Transport::IB0)) + addIb(Transport::IB0); + if (transports.has(Transport::IB1)) + addIb(Transport::IB1); + if (transports.has(Transport::IB2)) + addIb(Transport::IB2); + if (transports.has(Transport::IB3)) + addIb(Transport::IB3); + if (transports.has(Transport::IB4)) + addIb(Transport::IB4); + if (transports.has(Transport::IB5)) + addIb(Transport::IB5); + if (transports.has(Transport::IB6)) + addIb(Transport::IB6); + if (transports.has(Transport::IB7)) + addIb(Transport::IB7); } } -RegisteredMemory::RegisteredMemory(std::shared_ptr pimpl) : pimpl(pimpl) {} +RegisteredMemory::RegisteredMemory(std::shared_ptr pimpl) : pimpl(pimpl) +{ +} RegisteredMemory::~RegisteredMemory() = default; -void* RegisteredMemory::data() { +void* RegisteredMemory::data() +{ return pimpl->data; } -size_t RegisteredMemory::size() { +size_t RegisteredMemory::size() +{ return pimpl->size; } -int RegisteredMemory::rank() { +int RegisteredMemory::rank() +{ return pimpl->rank; } -TransportFlags RegisteredMemory::transports() { +TransportFlags RegisteredMemory::transports() +{ return pimpl->transports; } -std::vector RegisteredMemory::serialize() { +std::vector RegisteredMemory::serialize() +{ std::vector result; std::copy_n(reinterpret_cast(&pimpl->size), sizeof(pimpl->size), std::back_inserter(result)); std::copy_n(reinterpret_cast(&pimpl->rank), sizeof(pimpl->rank), std::back_inserter(result)); @@ -67,7 +85,8 @@ std::vector RegisteredMemory::serialize() { for (auto& entry : pimpl->transportInfos) { std::copy_n(reinterpret_cast(&entry.transport), sizeof(entry.transport), std::back_inserter(result)); if (entry.transport == Transport::CudaIpc) { - std::copy_n(reinterpret_cast(&entry.cudaIpcHandle), sizeof(entry.cudaIpcHandle), std::back_inserter(result)); + std::copy_n(reinterpret_cast(&entry.cudaIpcHandle), sizeof(entry.cudaIpcHandle), + std::back_inserter(result)); } else if (AllIBTransports.has(entry.transport)) { std::copy_n(reinterpret_cast(&entry.ibMrInfo), sizeof(entry.ibMrInfo), std::back_inserter(result)); } else { @@ -77,11 +96,13 @@ std::vector RegisteredMemory::serialize() { return result; } -RegisteredMemory RegisteredMemory::deserialize(const std::vector& data) { +RegisteredMemory RegisteredMemory::deserialize(const std::vector& data) +{ return RegisteredMemory(std::make_shared(data)); } -RegisteredMemory::Impl::Impl(const std::vector& serialization) { +RegisteredMemory::Impl::Impl(const std::vector& serialization) +{ auto it = serialization.begin(); std::copy_n(it, sizeof(this->size), reinterpret_cast(&this->size)); it += sizeof(this->size); @@ -118,6 +139,9 @@ RegisteredMemory::Impl::Impl(const std::vector& serialization) { if (transports.has(Transport::CudaIpc)) { auto entry = getTransportInfo(Transport::CudaIpc); + void* baseDataPtr; + size_t baseDataSize; // dummy + CUTHROW(cuMemGetAddressRange((CUdeviceptr*)&baseDataPtr, &baseDataSize, (CUdeviceptr)data)); CUDATHROW(cudaIpcOpenMemHandle(&data, entry.cudaIpcHandle, cudaIpcMemLazyEnablePeerAccess)); } } diff --git a/tests/allgather_test_cpp.cu b/tests/allgather_test_cpp.cu index 9b056e84..908a24f4 100644 --- a/tests/allgather_test_cpp.cu +++ b/tests/allgather_test_cpp.cu @@ -4,14 +4,14 @@ #ifdef MSCCLPP_USE_MPI_FOR_TESTS #include "mpi.h" #endif // MSCCLPP_USE_MPI_FOR_TESTS +#include +#include #include #include #include #include #include #include -#include -#include static int nranksPerNode = 8; @@ -50,7 +50,8 @@ static double getTime(void) __constant__ mscclpp::SimpleDeviceConnection constDevConns[16]; -__device__ void allgather0(mscclpp::SimpleDeviceConnection devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU) +__device__ void allgather0(mscclpp::SimpleDeviceConnection devConn, int rank, int world_size, int remoteRank, + size_t nelemsPerGPU) { // this allgather is really simple and implemented as an alltoall @@ -69,8 +70,8 @@ __device__ void allgather0(mscclpp::SimpleDeviceConnection devConn, int rank, in devConn.wait(); } -__device__ void localAllGather(mscclpp::SimpleDeviceConnection devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - uint64_t offset, uint64_t size) +__device__ void localAllGather(mscclpp::SimpleDeviceConnection devConn, int rank, int world_size, int nranksPerNode, + int remoteRank, uint64_t offset, uint64_t size) { // this allgather algorithm works as follows: // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode @@ -93,15 +94,15 @@ __device__ void localAllGather(mscclpp::SimpleDeviceConnection devConn, int rank } } -__device__ void allgather1(mscclpp::SimpleDeviceConnection devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - size_t nelemsPerGPU) +__device__ void allgather1(mscclpp::SimpleDeviceConnection devConn, int rank, int world_size, int nranksPerNode, + int remoteRank, size_t nelemsPerGPU) { localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); } -__device__ void allgather2(mscclpp::SimpleDeviceConnection devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - size_t nelemsPerGPU) +__device__ void allgather2(mscclpp::SimpleDeviceConnection devConn, int rank, int world_size, int nranksPerNode, + int remoteRank, size_t nelemsPerGPU) { // this allgather is a pipelined and hierarchical one and only works for two nodes // it is implemented as follows: @@ -243,13 +244,13 @@ void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& co comm.connectionSetup(); std::vector devConns; - std::transform(hostConns.begin(), hostConns.end(), std::back_inserter(devConns), - [](std::shared_ptr& hostConn) { - return mscclpp::SimpleDeviceConnection(*hostConn); - }); + std::transform( + hostConns.begin(), hostConns.end(), std::back_inserter(devConns), + [](std::shared_ptr& hostConn) { return mscclpp::SimpleDeviceConnection(*hostConn); }); assert(devConns.size() < sizeof(constDevConns) / sizeof(mscclpp::SimpleDeviceConnection)); - CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns.data(), sizeof(mscclpp::SimpleDeviceConnection) * devConns.size() )); + CUDACHECK( + cudaMemcpyToSymbol(constDevConns, devConns.data(), sizeof(mscclpp::SimpleDeviceConnection) * devConns.size())); } void printUsage(const char* prog, bool isMpi) @@ -399,17 +400,17 @@ int main(int argc, const char* argv[]) } size_t nelemsPerGPU = dataSize / sizeof(int) / world_size; - try{ + try { if (rank == 0) - printf("Initializing MSCCL++\n"); + printf("Initializing MSCCL++\n"); mscclpp::Communicator comm(world_size, ip_port, rank); if (rank == 0) - printf("Initializing data for allgather test\n"); + printf("Initializing data for allgather test\n"); initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d); if (rank == 0) - printf("Setting up the connection in MSCCL++\n"); + printf("Setting up the connection in MSCCL++\n"); setupMscclppConnections(rank, world_size, comm, data_d, dataSize); if (rank == 0) @@ -466,7 +467,7 @@ int main(int argc, const char* argv[]) int cudagraphwarmup = 10; if (rank == 0) printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup, - cudagraphiter); + cudagraphiter); for (int i = 0; i < cudagraphwarmup; ++i) { cudaGraphLaunch(instance, stream); } @@ -476,7 +477,7 @@ int main(int argc, const char* argv[]) int cudagraphlaunch = 10; if (rank == 0) printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch, - cudagraphiter); + cudagraphiter); comm.bootstrapAllGather(tmp, sizeof(int)); double t0, t1, ms, time_in_us; t0 = getTime(); @@ -489,7 +490,7 @@ int main(int argc, const char* argv[]) ms = (t1 - t0) * 1000.0; time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter; printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us, - (double)(dataSize) / 1e9 / (time_in_us / 1e6)); + (double)(dataSize) / 1e9 / (time_in_us / 1e6)); comm.bootstrapAllGather(tmp, sizeof(int)); if (rank == 0) diff --git a/tests/bootstrap_test_cpp.cc b/tests/bootstrap_test_cpp.cc index bdde8467..e4fe65bb 100644 --- a/tests/bootstrap_test_cpp.cc +++ b/tests/bootstrap_test_cpp.cc @@ -1,11 +1,12 @@ #include "mscclpp.hpp" -#include #include #include +#include #include -void test_allgather(std::shared_ptr bootstrap){ +void test_allgather(std::shared_ptr bootstrap) +{ std::vector tmp(bootstrap->getNranks(), 0); tmp[bootstrap->getRank()] = bootstrap->getRank() + 1; bootstrap->allGather(tmp.data(), sizeof(int)); @@ -16,13 +17,15 @@ void test_allgather(std::shared_ptr bootstrap){ std::cout << "AllGather test passed!" << std::endl; } -void test_barrier(std::shared_ptr bootstrap){ +void test_barrier(std::shared_ptr bootstrap) +{ bootstrap->barrier(); if (bootstrap->getRank() == 0) std::cout << "Barrier test passed!" << std::endl; } -void test_sendrecv(std::shared_ptr bootstrap){ +void test_sendrecv(std::shared_ptr bootstrap) +{ for (int i = 0; i < bootstrap->getNranks(); i++) { if (bootstrap->getRank() == i) continue; @@ -52,13 +55,15 @@ void test_sendrecv(std::shared_ptr bootstrap){ std::cout << "Send/Recv test passed!" << std::endl; } -void test_all(std::shared_ptr bootstrap){ +void test_all(std::shared_ptr bootstrap) +{ test_allgather(bootstrap); test_barrier(bootstrap); test_sendrecv(bootstrap); } -void test_mscclpp_bootstrap_with_id(int rank, int worldSize){ +void test_mscclpp_bootstrap_with_id(int rank, int worldSize) +{ auto bootstrap = std::make_shared(rank, worldSize); mscclpp::UniqueId id; if (bootstrap->getRank() == 0) @@ -71,7 +76,8 @@ void test_mscclpp_bootstrap_with_id(int rank, int worldSize){ std::cout << "--- MSCCLPP::Bootstrap test with unique id passed! ---" << std::endl; } -void test_mscclpp_bootstrap_with_ip_port_pair(int rank, int worldSize, char* ipPortPiar){ +void test_mscclpp_bootstrap_with_ip_port_pair(int rank, int worldSize, char* ipPortPiar) +{ std::shared_ptr bootstrap(new mscclpp::Bootstrap(rank, worldSize)); bootstrap->initialize(ipPortPiar); @@ -80,47 +86,57 @@ void test_mscclpp_bootstrap_with_ip_port_pair(int rank, int worldSize, char* ipP std::cout << "--- MSCCLPP::Bootstrap test with ip_port pair passed! ---" << std::endl; } -class MPIBootstrap : public mscclpp::BaseBootstrap { +class MPIBootstrap : public mscclpp::BaseBootstrap +{ public: - MPIBootstrap() : BaseBootstrap() {} - int getRank() override { + MPIBootstrap() : BaseBootstrap() + { + } + int getRank() override + { int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); return rank; } - int getNranks() override { + int getNranks() override + { int worldSize; MPI_Comm_size(MPI_COMM_WORLD, &worldSize); return worldSize; } - void allGather(void *sendbuf, int size) override { + void allGather(void* sendbuf, int size) override + { MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sendbuf, size, MPI_BYTE, MPI_COMM_WORLD); } - void barrier() override { + void barrier() override + { MPI_Barrier(MPI_COMM_WORLD); } - void send(void *sendbuf, int size, int dest, int tag) override { + void send(void* sendbuf, int size, int dest, int tag) override + { MPI_Send(sendbuf, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD); } - void recv(void *recvbuf, int size, int source, int tag) override { + void recv(void* recvbuf, int size, int source, int tag) override + { MPI_Recv(recvbuf, size, MPI_BYTE, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } }; -void test_mpi_bootstrap(){ +void test_mpi_bootstrap() +{ std::shared_ptr bootstrap(new MPIBootstrap()); test_all(bootstrap); if (bootstrap->getRank() == 0) std::cout << "--- MPI Bootstrap test passed! ---" << std::endl; } -int main(int argc, char **argv) +int main(int argc, char** argv) { int rank, worldSize; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &worldSize); - if (argc > 2){ + if (argc > 2) { if (rank == 0) std::cout << "Usage: " << argv[0] << " [ip:port]" << std::endl; MPI_Finalize(); diff --git a/tests/communicator_test_cpp.cc b/tests/communicator_test_cpp.cc index 1f14ca79..6864d97b 100644 --- a/tests/communicator_test_cpp.cc +++ b/tests/communicator_test_cpp.cc @@ -1,25 +1,20 @@ #include "mscclpp.hpp" -#include #include #include +#include #include -mscclpp::Transport findIb(int localRank){ - mscclpp::Transport IBs[] = { - mscclpp::Transport::IB0, - mscclpp::Transport::IB1, - mscclpp::Transport::IB2, - mscclpp::Transport::IB3, - mscclpp::Transport::IB4, - mscclpp::Transport::IB5, - mscclpp::Transport::IB6, - mscclpp::Transport::IB7 - }; +mscclpp::Transport findIb(int localRank) +{ + mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2, + mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5, + mscclpp::Transport::IB6, mscclpp::Transport::IB7}; return IBs[localRank]; } -void test_communicator(int rank, int worldSize, int nranksPerNode){ +void test_communicator(int rank, int worldSize, int nranksPerNode) +{ auto bootstrap = std::make_shared(rank, worldSize); mscclpp::UniqueId id; if (bootstrap->getRank() == 0) @@ -28,9 +23,9 @@ void test_communicator(int rank, int worldSize, int nranksPerNode){ bootstrap->initialize(id); auto communicator = std::make_shared(bootstrap); - for (int i = 0; i < worldSize; i++){ - if (i != rank){ - if (i / nranksPerNode == rank / nranksPerNode){ + for (int i = 0; i < worldSize; i++) { + if (i != rank) { + if (i / nranksPerNode == rank / nranksPerNode) { auto connect = communicator->connect(i, 0, mscclpp::Transport::CudaIpc); } else { auto connect = communicator->connect(i, 0, findIb(rank % nranksPerNode)); @@ -43,8 +38,7 @@ void test_communicator(int rank, int worldSize, int nranksPerNode){ std::cout << "--- MSCCLPP::Communicator tests passed! ---" << std::endl; } - -int main(int argc, char **argv) +int main(int argc, char** argv) { int rank, worldSize; MPI_Init(&argc, &argv); @@ -56,7 +50,7 @@ int main(int argc, char **argv) MPI_Comm_size(shmcomm, &shmWorldSize); int nranksPerNode = shmWorldSize; MPI_Comm_free(&shmcomm); - + test_communicator(rank, worldSize, nranksPerNode); MPI_Finalize(); diff --git a/tests/unittests/ib_test.cc b/tests/unittests/ib_test.cc index 6f84398f..3d99acb2 100644 --- a/tests/unittests/ib_test.cc +++ b/tests/unittests/ib_test.cc @@ -3,8 +3,8 @@ #include "ib.hpp" #include "infiniband/verbs.h" #include "mscclpp.hpp" -#include #include +#include // Measure current time in second. static double getTime(void)