From f68eeba2d453fba48c8478ea4ff37eac64b881a9 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Sat, 8 Apr 2023 05:29:34 +0000 Subject: [PATCH 1/4] change clock collection approach --- src/include/npkit/npkit.h | 11 ++++------- src/misc/npkit.cc | 39 ++++++++------------------------------- src/proxy.cc | 6 ++---- 3 files changed, 14 insertions(+), 42 deletions(-) diff --git a/src/include/npkit/npkit.h b/src/include/npkit/npkit.h index 4e208050..a0691afd 100644 --- a/src/include/npkit/npkit.h +++ b/src/include/npkit/npkit.h @@ -40,11 +40,9 @@ public: static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id); - static uint64_t* GetCpuTimestamp(); + static uint64_t GetCpuTimestamp(); private: - static void CpuTimestampUpdateThread(); - // 64K * 512 * 16B = 512MB per GPU static const uint64_t kMaxNumGpuEventsPerBuffer = 1ULL << 16; @@ -56,12 +54,11 @@ private: static NpKitEventCollectContext* gpu_collect_contexts_; static NpKitEventCollectContext* cpu_collect_contexts_; - static uint64_t* cpu_timestamp_; + + static uint64_t cpu_base_system_timestamp_; + static uint64_t cpu_base_steady_timestamp_; static uint64_t rank_; - - static std::thread* cpu_timestamp_update_thread_; - static volatile bool cpu_timestamp_update_thread_should_stop_; }; #endif \ No newline at end of file diff --git a/src/misc/npkit.cc b/src/misc/npkit.cc index 99a8cde8..4a7eb849 100644 --- a/src/misc/npkit.cc +++ b/src/misc/npkit.cc @@ -12,22 +12,8 @@ NpKitEvent** NpKit::cpu_event_buffers_ = nullptr; NpKitEventCollectContext* NpKit::gpu_collect_contexts_ = nullptr; NpKitEventCollectContext* NpKit::cpu_collect_contexts_ = nullptr; -uint64_t* NpKit::cpu_timestamp_ = nullptr; - -std::thread* NpKit::cpu_timestamp_update_thread_ = nullptr; -volatile bool NpKit::cpu_timestamp_update_thread_should_stop_ = false; - -void NpKit::CpuTimestampUpdateThread() -{ - uint64_t init_system_clock = std::chrono::system_clock::now().time_since_epoch().count(); - uint64_t init_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); - uint64_t curr_steady_clock = 0; - volatile uint64_t* volatile_cpu_timestamp_ = cpu_timestamp_; - while (!cpu_timestamp_update_thread_should_stop_) { - curr_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); - *volatile_cpu_timestamp_ = init_system_clock + (curr_steady_clock - init_steady_clock); - } -} +uint64_t NpKit::cpu_base_system_timestamp_ = 0; +uint64_t NpKit::cpu_base_steady_timestamp_ = 0; mscclppResult_t NpKit::Init(int rank) { @@ -54,11 +40,8 @@ mscclppResult_t NpKit::Init(int rank) } // Init timestamp - MSCCLPPCHECK(mscclppCudaHostCalloc(&cpu_timestamp_, 1)); - volatile uint64_t* volatile_cpu_timestamp = cpu_timestamp_; - *volatile_cpu_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - cpu_timestamp_update_thread_should_stop_ = false; - cpu_timestamp_update_thread_ = new std::thread(CpuTimestampUpdateThread); + cpu_base_system_timestamp_ = std::chrono::system_clock::now().time_since_epoch().count(); + cpu_base_steady_timestamp_ = std::chrono::steady_clock::now().time_since_epoch().count(); return mscclppSuccess; } @@ -133,10 +116,6 @@ mscclppResult_t NpKit::Shutdown() { uint64_t i = 0; - // Stop CPU timestamp updating thread - cpu_timestamp_update_thread_should_stop_ = true; - cpu_timestamp_update_thread_->join(); - // Free CPU event data structures for (i = 0; i < kNumCpuEventBuffers; i++) { free(cpu_event_buffers_[i]); @@ -151,9 +130,6 @@ mscclppResult_t NpKit::Shutdown() free(gpu_event_buffers_); CUDACHECK(cudaFree(gpu_collect_contexts_)); - // Free timestamp - MSCCLPPCHECK(mscclppCudaHostFree(cpu_timestamp_)); - return mscclppSuccess; } @@ -175,7 +151,8 @@ void NpKit::CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t } } -uint64_t* NpKit::GetCpuTimestamp() +uint64_t NpKit::GetCpuTimestamp() { - return cpu_timestamp_; -} \ No newline at end of file + uint64_t cpu_curr_steady_timestamp_ = std::chrono::steady_clock::now().time_since_epoch().count(); + return cpu_base_steady_timestamp_ + (cpu_curr_steady_timestamp_ - cpu_base_steady_timestamp_); +} diff --git a/src/proxy.cc b/src/proxy.cc index 8d2fe6be..2c670fb6 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -62,7 +62,6 @@ static void npkitInitReqIds(struct mscclppComm* comm) static void npkitCollectEntryEvent(struct mscclppConn* conn, uint8_t type, uint32_t size, int channelId) { - uint64_t ts = *(volatile uint64_t*)NpKit::GetCpuTimestamp(); uint64_t reqId = 0; if (conn->npkitFreeReqIds.size() == 0) { reqId = conn->npkitUsedReqIds.size(); @@ -71,15 +70,14 @@ static void npkitCollectEntryEvent(struct mscclppConn* conn, uint8_t type, uint3 conn->npkitFreeReqIds.pop_back(); } conn->npkitUsedReqIds.push_back(reqId); - NpKit::CollectCpuEvent(type, size, (uint32_t)reqId, ts, channelId); + NpKit::CollectCpuEvent(type, size, (uint32_t)reqId, NpKit::GetCpuTimestamp(), channelId); } static void npkitCollectExitEvents(struct mscclppConn* conn, uint8_t type, int channelId) { - uint64_t ts = *(volatile uint64_t*)NpKit::GetCpuTimestamp(); while (conn->npkitUsedReqIds.size()) { uint64_t reqId = conn->npkitUsedReqIds.back(); - NpKit::CollectCpuEvent(type, 0, (uint32_t)reqId, ts, channelId); + NpKit::CollectCpuEvent(type, 0, (uint32_t)reqId, NpKit::GetCpuTimestamp(), channelId); conn->npkitFreeReqIds.push_back(reqId); conn->npkitUsedReqIds.pop_back(); } From 748d3d1596f297623fe51648ea3d9852d0856a3a Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Sat, 8 Apr 2023 07:12:46 +0000 Subject: [PATCH 2/4] separate flag and data --- src/include/npkit/npkit_event.h | 12 +++++------- src/proxy.cc | 8 ++++---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/include/npkit/npkit_event.h b/src/include/npkit/npkit_event.h index 0b300778..26087469 100644 --- a/src/include/npkit/npkit_event.h +++ b/src/include/npkit/npkit_event.h @@ -9,14 +9,12 @@ #define NPKIT_EVENT_SM_REDUCE_ENTRY 0x3 #define NPKIT_EVENT_SM_REDUCE_EXIT 0x4 -#define NPKIT_EVENT_IB_SEND_ENTRY 0x5 -#define NPKIT_EVENT_IB_SEND_EXIT 0x6 -#define NPKIT_EVENT_IB_RECV_ENTRY 0x7 -#define NPKIT_EVENT_IB_RECV_EXIT 0x8 +#define NPKIT_EVENT_IB_SEND_DATA_ENTRY 0x5 +#define NPKIT_EVENT_IB_SEND_FLAG_ENTRY 0x6 +#define NPKIT_EVENT_IB_SEND_EXIT 0x7 -#define NPKIT_EVENT_DMA_SEND_ENTRY 0x9 +#define NPKIT_EVENT_DMA_SEND_DATA_ENTRY 0x8 +#define NPKIT_EVENT_DMA_SEND_FLAG_ENTRY 0x9 #define NPKIT_EVENT_DMA_SEND_EXIT 0xA -#define NPKIT_EVENT_DMA_RECV_ENTRY 0xB -#define NPKIT_EVENT_DMA_RECV_EXIT 0xC #endif \ No newline at end of file diff --git a/src/proxy.cc b/src/proxy.cc index 2c670fb6..c55f492b 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -143,7 +143,7 @@ void* mscclppProxyService(void* _args) void* srcBuff = (void*)((char*)conn->devConn->localBuff + trigger.fields.srcDataOffset); void* dstBuff = (void*)((char*)conn->devConn->remoteBuff + trigger.fields.dstDataOffset); PROXYCUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, trigger.fields.dataSize, cudaMemcpyDeviceToDevice, p2pStream)); - npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_ENTRY, (uint32_t)trigger.fields.dataSize, + npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)trigger.fields.dataSize, trigger.fields.connId); } else { conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, @@ -154,7 +154,7 @@ void* mscclppProxyService(void* _args) // Return value is errno. WARN("data postSend failed: errno %d", ret); } - npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_ENTRY, (uint32_t)trigger.fields.dataSize, + npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)trigger.fields.dataSize, trigger.fields.connId); } } @@ -162,7 +162,7 @@ void* mscclppProxyService(void* _args) if (isP2pProxy) { PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->sendEpochId, sizeof(uint64_t), cudaMemcpyDeviceToDevice, p2pStream)); - npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_ENTRY, (uint32_t)sizeof(uint64_t), trigger.fields.connId); + npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t), trigger.fields.connId); } else { // My local flag is copied to the peer's proxy flag conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t), @@ -170,7 +170,7 @@ void* mscclppProxyService(void* _args) if ((ret = conn->ibQp->postSend()) != 0) { WARN("flag postSend failed: errno %d", ret); } - npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_ENTRY, (uint32_t)sizeof(uint64_t), trigger.fields.connId); + npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t), trigger.fields.connId); } } // Wait for completion From 09de60854ecd2393ab64e73dc6c48b27cee59793 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Sat, 8 Apr 2023 07:15:25 +0000 Subject: [PATCH 3/4] fix lint --- src/proxy.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/proxy.cc b/src/proxy.cc index c55f492b..555a19d0 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -162,7 +162,8 @@ void* mscclppProxyService(void* _args) if (isP2pProxy) { PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->sendEpochId, sizeof(uint64_t), cudaMemcpyDeviceToDevice, p2pStream)); - npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t), trigger.fields.connId); + npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, + (uint32_t)sizeof(uint64_t), trigger.fields.connId); } else { // My local flag is copied to the peer's proxy flag conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t), From 5f0b58abdaf440f7cbfb285380a82acec7ed948f Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Sat, 8 Apr 2023 07:16:32 +0000 Subject: [PATCH 4/4] fix lint --- src/proxy.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/proxy.cc b/src/proxy.cc index 555a19d0..ac704bc6 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -162,8 +162,8 @@ void* mscclppProxyService(void* _args) if (isP2pProxy) { PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->sendEpochId, sizeof(uint64_t), cudaMemcpyDeviceToDevice, p2pStream)); - npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, - (uint32_t)sizeof(uint64_t), trigger.fields.connId); + npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t), + trigger.fields.connId); } else { // My local flag is copied to the peer's proxy flag conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t),