mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-12 17:26:04 +00:00
Merge pull request #52 from microsoft/ziyyang/npkit-fix-numa
NPKit: remove timestamp update thread
This commit is contained in:
@@ -40,11 +40,9 @@ public:
|
||||
|
||||
static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id);
|
||||
|
||||
static uint64_t* GetCpuTimestamp();
|
||||
static uint64_t GetCpuTimestamp();
|
||||
|
||||
private:
|
||||
static void CpuTimestampUpdateThread();
|
||||
|
||||
// 64K * 512 * 16B = 512MB per GPU
|
||||
static const uint64_t kMaxNumGpuEventsPerBuffer = 1ULL << 16;
|
||||
|
||||
@@ -56,12 +54,11 @@ private:
|
||||
|
||||
static NpKitEventCollectContext* gpu_collect_contexts_;
|
||||
static NpKitEventCollectContext* cpu_collect_contexts_;
|
||||
static uint64_t* cpu_timestamp_;
|
||||
|
||||
static uint64_t cpu_base_system_timestamp_;
|
||||
static uint64_t cpu_base_steady_timestamp_;
|
||||
|
||||
static uint64_t rank_;
|
||||
|
||||
static std::thread* cpu_timestamp_update_thread_;
|
||||
static volatile bool cpu_timestamp_update_thread_should_stop_;
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -9,14 +9,12 @@
|
||||
#define NPKIT_EVENT_SM_REDUCE_ENTRY 0x3
|
||||
#define NPKIT_EVENT_SM_REDUCE_EXIT 0x4
|
||||
|
||||
#define NPKIT_EVENT_IB_SEND_ENTRY 0x5
|
||||
#define NPKIT_EVENT_IB_SEND_EXIT 0x6
|
||||
#define NPKIT_EVENT_IB_RECV_ENTRY 0x7
|
||||
#define NPKIT_EVENT_IB_RECV_EXIT 0x8
|
||||
#define NPKIT_EVENT_IB_SEND_DATA_ENTRY 0x5
|
||||
#define NPKIT_EVENT_IB_SEND_FLAG_ENTRY 0x6
|
||||
#define NPKIT_EVENT_IB_SEND_EXIT 0x7
|
||||
|
||||
#define NPKIT_EVENT_DMA_SEND_ENTRY 0x9
|
||||
#define NPKIT_EVENT_DMA_SEND_DATA_ENTRY 0x8
|
||||
#define NPKIT_EVENT_DMA_SEND_FLAG_ENTRY 0x9
|
||||
#define NPKIT_EVENT_DMA_SEND_EXIT 0xA
|
||||
#define NPKIT_EVENT_DMA_RECV_ENTRY 0xB
|
||||
#define NPKIT_EVENT_DMA_RECV_EXIT 0xC
|
||||
|
||||
#endif
|
||||
@@ -12,22 +12,8 @@ NpKitEvent** NpKit::cpu_event_buffers_ = nullptr;
|
||||
|
||||
NpKitEventCollectContext* NpKit::gpu_collect_contexts_ = nullptr;
|
||||
NpKitEventCollectContext* NpKit::cpu_collect_contexts_ = nullptr;
|
||||
uint64_t* NpKit::cpu_timestamp_ = nullptr;
|
||||
|
||||
std::thread* NpKit::cpu_timestamp_update_thread_ = nullptr;
|
||||
volatile bool NpKit::cpu_timestamp_update_thread_should_stop_ = false;
|
||||
|
||||
void NpKit::CpuTimestampUpdateThread()
|
||||
{
|
||||
uint64_t init_system_clock = std::chrono::system_clock::now().time_since_epoch().count();
|
||||
uint64_t init_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count();
|
||||
uint64_t curr_steady_clock = 0;
|
||||
volatile uint64_t* volatile_cpu_timestamp_ = cpu_timestamp_;
|
||||
while (!cpu_timestamp_update_thread_should_stop_) {
|
||||
curr_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count();
|
||||
*volatile_cpu_timestamp_ = init_system_clock + (curr_steady_clock - init_steady_clock);
|
||||
}
|
||||
}
|
||||
uint64_t NpKit::cpu_base_system_timestamp_ = 0;
|
||||
uint64_t NpKit::cpu_base_steady_timestamp_ = 0;
|
||||
|
||||
mscclppResult_t NpKit::Init(int rank)
|
||||
{
|
||||
@@ -54,11 +40,8 @@ mscclppResult_t NpKit::Init(int rank)
|
||||
}
|
||||
|
||||
// Init timestamp
|
||||
MSCCLPPCHECK(mscclppCudaHostCalloc(&cpu_timestamp_, 1));
|
||||
volatile uint64_t* volatile_cpu_timestamp = cpu_timestamp_;
|
||||
*volatile_cpu_timestamp = std::chrono::system_clock::now().time_since_epoch().count();
|
||||
cpu_timestamp_update_thread_should_stop_ = false;
|
||||
cpu_timestamp_update_thread_ = new std::thread(CpuTimestampUpdateThread);
|
||||
cpu_base_system_timestamp_ = std::chrono::system_clock::now().time_since_epoch().count();
|
||||
cpu_base_steady_timestamp_ = std::chrono::steady_clock::now().time_since_epoch().count();
|
||||
|
||||
return mscclppSuccess;
|
||||
}
|
||||
@@ -133,10 +116,6 @@ mscclppResult_t NpKit::Shutdown()
|
||||
{
|
||||
uint64_t i = 0;
|
||||
|
||||
// Stop CPU timestamp updating thread
|
||||
cpu_timestamp_update_thread_should_stop_ = true;
|
||||
cpu_timestamp_update_thread_->join();
|
||||
|
||||
// Free CPU event data structures
|
||||
for (i = 0; i < kNumCpuEventBuffers; i++) {
|
||||
free(cpu_event_buffers_[i]);
|
||||
@@ -151,9 +130,6 @@ mscclppResult_t NpKit::Shutdown()
|
||||
free(gpu_event_buffers_);
|
||||
CUDACHECK(cudaFree(gpu_collect_contexts_));
|
||||
|
||||
// Free timestamp
|
||||
MSCCLPPCHECK(mscclppCudaHostFree(cpu_timestamp_));
|
||||
|
||||
return mscclppSuccess;
|
||||
}
|
||||
|
||||
@@ -175,7 +151,8 @@ void NpKit::CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t* NpKit::GetCpuTimestamp()
|
||||
uint64_t NpKit::GetCpuTimestamp()
|
||||
{
|
||||
return cpu_timestamp_;
|
||||
}
|
||||
uint64_t cpu_curr_steady_timestamp_ = std::chrono::steady_clock::now().time_since_epoch().count();
|
||||
return cpu_base_steady_timestamp_ + (cpu_curr_steady_timestamp_ - cpu_base_steady_timestamp_);
|
||||
}
|
||||
|
||||
15
src/proxy.cc
15
src/proxy.cc
@@ -62,7 +62,6 @@ static void npkitInitReqIds(struct mscclppComm* comm)
|
||||
|
||||
static void npkitCollectEntryEvent(struct mscclppConn* conn, uint8_t type, uint32_t size, int channelId)
|
||||
{
|
||||
uint64_t ts = *(volatile uint64_t*)NpKit::GetCpuTimestamp();
|
||||
uint64_t reqId = 0;
|
||||
if (conn->npkitFreeReqIds.size() == 0) {
|
||||
reqId = conn->npkitUsedReqIds.size();
|
||||
@@ -71,15 +70,14 @@ static void npkitCollectEntryEvent(struct mscclppConn* conn, uint8_t type, uint3
|
||||
conn->npkitFreeReqIds.pop_back();
|
||||
}
|
||||
conn->npkitUsedReqIds.push_back(reqId);
|
||||
NpKit::CollectCpuEvent(type, size, (uint32_t)reqId, ts, channelId);
|
||||
NpKit::CollectCpuEvent(type, size, (uint32_t)reqId, NpKit::GetCpuTimestamp(), channelId);
|
||||
}
|
||||
|
||||
static void npkitCollectExitEvents(struct mscclppConn* conn, uint8_t type, int channelId)
|
||||
{
|
||||
uint64_t ts = *(volatile uint64_t*)NpKit::GetCpuTimestamp();
|
||||
while (conn->npkitUsedReqIds.size()) {
|
||||
uint64_t reqId = conn->npkitUsedReqIds.back();
|
||||
NpKit::CollectCpuEvent(type, 0, (uint32_t)reqId, ts, channelId);
|
||||
NpKit::CollectCpuEvent(type, 0, (uint32_t)reqId, NpKit::GetCpuTimestamp(), channelId);
|
||||
conn->npkitFreeReqIds.push_back(reqId);
|
||||
conn->npkitUsedReqIds.pop_back();
|
||||
}
|
||||
@@ -145,7 +143,7 @@ void* mscclppProxyService(void* _args)
|
||||
void* srcBuff = (void*)((char*)conn->devConn->localBuff + trigger.fields.srcDataOffset);
|
||||
void* dstBuff = (void*)((char*)conn->devConn->remoteBuff + trigger.fields.dstDataOffset);
|
||||
PROXYCUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, trigger.fields.dataSize, cudaMemcpyDeviceToDevice, p2pStream));
|
||||
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_ENTRY, (uint32_t)trigger.fields.dataSize,
|
||||
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)trigger.fields.dataSize,
|
||||
trigger.fields.connId);
|
||||
} else {
|
||||
conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize,
|
||||
@@ -156,7 +154,7 @@ void* mscclppProxyService(void* _args)
|
||||
// Return value is errno.
|
||||
WARN("data postSend failed: errno %d", ret);
|
||||
}
|
||||
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_ENTRY, (uint32_t)trigger.fields.dataSize,
|
||||
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)trigger.fields.dataSize,
|
||||
trigger.fields.connId);
|
||||
}
|
||||
}
|
||||
@@ -164,7 +162,8 @@ void* mscclppProxyService(void* _args)
|
||||
if (isP2pProxy) {
|
||||
PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->sendEpochId, sizeof(uint64_t),
|
||||
cudaMemcpyDeviceToDevice, p2pStream));
|
||||
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_ENTRY, (uint32_t)sizeof(uint64_t), trigger.fields.connId);
|
||||
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t),
|
||||
trigger.fields.connId);
|
||||
} else {
|
||||
// My local flag is copied to the peer's proxy flag
|
||||
conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t),
|
||||
@@ -172,7 +171,7 @@ void* mscclppProxyService(void* _args)
|
||||
if ((ret = conn->ibQp->postSend()) != 0) {
|
||||
WARN("flag postSend failed: errno %d", ret);
|
||||
}
|
||||
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_ENTRY, (uint32_t)sizeof(uint64_t), trigger.fields.connId);
|
||||
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t), trigger.fields.connId);
|
||||
}
|
||||
}
|
||||
// Wait for completion
|
||||
|
||||
Reference in New Issue
Block a user