From f92b428cba0de8e2c60d9110db53ac49e7c76d22 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Fri, 24 Mar 2023 06:41:16 +0000 Subject: [PATCH] Port NPKit --- Makefile | 9 ++ src/include/npkit/npkit.h | 65 ++++++++ src/include/npkit/npkit_event.h | 22 +++ src/include/npkit/npkit_struct.h | 25 +++ src/init.cc | 29 ++++ src/misc/npkit.cc | 174 ++++++++++++++++++++ src/proxy.cc | 14 ++ tools/npkit/build_and_run_npkit.sh | 21 +++ tools/npkit/npkit_trace_generator.py | 228 +++++++++++++++++++++++++++ 9 files changed, 587 insertions(+) create mode 100644 src/include/npkit/npkit.h create mode 100644 src/include/npkit/npkit_event.h create mode 100644 src/include/npkit/npkit_struct.h create mode 100644 src/misc/npkit.cc create mode 100644 tools/npkit/build_and_run_npkit.sh create mode 100644 tools/npkit/npkit_trace_generator.py diff --git a/Makefile b/Makefile index 66435a78..bffe6e57 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ MSCCLPP_MINOR := 1 DEBUG ?= 0 VERBOSE ?= 1 TRACE ?= 0 +NPKIT ?= 0 USE_MPI_FOR_TESTS ?= 1 ######## CUDA @@ -101,10 +102,18 @@ LIBDIR := lib OBJDIR := obj BINDIR := bin +ifneq ($(NPKIT), 0) +CXXFLAGS += -DENABLE_NPKIT +NVCUFLAGS += -DENABLE_NPKIT +endif + LDFLAGS := $(NVLDFLAGS) -libverbs -lgdrapi -lnuma LIBSRCS := $(addprefix src/,debug.cc utils.cc param.cc gdr.cc init.cc proxy.cc ib.cc) LIBSRCS += $(addprefix src/bootstrap/,bootstrap.cc socket.cc) +ifneq ($(NPKIT), 0) +LIBSRCS += $(addprefix src/misc/,npkit.cc) +endif LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS)) LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) diff --git a/src/include/npkit/npkit.h b/src/include/npkit/npkit.h new file mode 100644 index 00000000..9078865b --- /dev/null +++ b/src/include/npkit/npkit.h @@ -0,0 +1,65 @@ +#ifndef NPKIT_H_ +#define NPKIT_H_ + +#include +#include + +#include + +#include "npkit/npkit_event.h" +#include "npkit/npkit_struct.h" + +class NpKit { + public: + static const uint64_t kNumGpuEventBuffers = 512; + + static const uint64_t kNumCpuEventBuffers = 32; + + static mscclppResult_t Init(int rank); + + static mscclppResult_t Dump(const std::string& dump_dir); + + static mscclppResult_t Shutdown(); + + static NpKitEventCollectContext* GetGpuEventCollectContexts(); + + static inline __device__ void CollectGpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, + NpKitEventCollectContext* ctx) { + uint64_t event_buffer_head = ctx->event_buffer_head; + if (event_buffer_head < kMaxNumGpuEventsPerBuffer) { + NpKitEvent& event = ctx->event_buffer[event_buffer_head]; + event.fields.type = type; + event.fields.size = size; + event.fields.rsvd = rsvd; + event.fields.timestamp = timestamp; + ctx->event_buffer_head++; + } + } + + static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id); + + static uint64_t* GetCpuTimestamp(); + + private: + static void CpuTimestampUpdateThread(); + + // 64K * 512 * 16B = 512MB per GPU + static const uint64_t kMaxNumGpuEventsPerBuffer = 1ULL << 16; + + // 64K * 2 (send/recv) * (512/32) = 2M, 2M * 32 * 16B = 1GB per CPU + static const uint64_t kMaxNumCpuEventsPerBuffer = 1ULL << 21; + + static NpKitEvent** gpu_event_buffers_; + static NpKitEvent** cpu_event_buffers_; + + static NpKitEventCollectContext* gpu_collect_contexts_; + static NpKitEventCollectContext* cpu_collect_contexts_; + static uint64_t* cpu_timestamp_; + + static uint64_t rank_; + + static std::thread* cpu_timestamp_update_thread_; + static volatile bool cpu_timestamp_update_thread_should_stop_; +}; + +#endif \ No newline at end of file diff --git a/src/include/npkit/npkit_event.h b/src/include/npkit/npkit_event.h new file mode 100644 index 00000000..92386c50 --- /dev/null +++ b/src/include/npkit/npkit_event.h @@ -0,0 +1,22 @@ +#ifndef NPKIT_EVENT_H_ +#define NPKIT_EVENT_H_ + +#define NPKIT_EVENT_INVALID 0x0 + +#define NPKIT_EVENT_TIME_SYNC_GPU 0x1 +#define NPKIT_EVENT_TIME_SYNC_CPU 0x2 + +#define NPKIT_EVENT_SM_REDUCE_ENTRY 0x3 +#define NPKIT_EVENT_SM_REDUCE_EXIT 0x4 + +#define NPKIT_EVENT_IB_SEND_ENTRY 0x5 +#define NPKIT_EVENT_IB_SEND_EXIT 0x6 +#define NPKIT_EVENT_IB_RECV_ENTRY 0x7 +#define NPKIT_EVENT_IB_RECV_EXIT 0x8 + +#define NPKIT_EVENT_DMA_SEND_ENTRY 0x9 +#define NPKIT_EVENT_DMA_SEND_EXIT 0xA +#define NPKIT_EVENT_DMA_RECV_ENTRY 0xB +#define NPKIT_EVENT_DMA_RECV_EXIT 0xC + +#endif \ No newline at end of file diff --git a/src/include/npkit/npkit_struct.h b/src/include/npkit/npkit_struct.h new file mode 100644 index 00000000..a18e8798 --- /dev/null +++ b/src/include/npkit/npkit_struct.h @@ -0,0 +1,25 @@ +#ifndef NPKIT_STRUCT_H_ +#define NPKIT_STRUCT_H_ + +#include + +#pragma pack(push, 1) + +union NpKitEvent { + uint64_t bits[2]; + struct { + uint64_t type : 8; + uint64_t size : 32; + uint64_t rsvd : 24; + uint64_t timestamp; + } fields; +}; + +struct NpKitEventCollectContext { + NpKitEvent* event_buffer; + uint64_t event_buffer_head; +}; + +#pragma pack(pop) + +#endif \ No newline at end of file diff --git a/src/init.cc b/src/init.cc index 6cbc858a..00c02874 100644 --- a/src/init.cc +++ b/src/init.cc @@ -4,6 +4,9 @@ #include "gdr.h" #include #include +#if defined(ENABLE_NPKIT) +#include "npkit/npkit.h" +#endif static uint64_t hashUniqueId(mscclppUniqueId const &id) { char const *bytes = (char const*)&id; @@ -99,6 +102,11 @@ mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, c MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t **)&_comm->abortFlag, 1), res, fail); MSCCLPPCHECK(bootstrapInit(&handle, _comm)); +#if defined(ENABLE_NPKIT) + // Init NPKit + MSCCLPPCHECK(NpKit::Init(_comm->rank)); +#endif + // _comm->maxLocalRanks = 8; // MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToNode, nranks), res, fail); // MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToLocalRank, nranks), res, fail); @@ -163,6 +171,11 @@ mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, msccl MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t **)&_comm->abortFlag, 1), res, fail); MSCCLPPCHECK(bootstrapInit(handle, _comm)); +#if defined(ENABLE_NPKIT) + // Init NPKit + MSCCLPPCHECK(NpKit::Init(_comm->rank)); +#endif + *comm = _comm; return res; fail: @@ -176,6 +189,10 @@ fail: MSCCLPP_API(mscclppResult_t, mscclppCommDestroy, mscclppComm_t comm); mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){ +#if defined(ENABLE_NPKIT) + const char* npkitDumpDir = nullptr; +#endif + if (comm == NULL) return mscclppSuccess; @@ -215,6 +232,18 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){ mscclppCudaHostFree((void *)comm->abortFlag); free(comm); + +#if defined(ENABLE_NPKIT) + // Dump NPKit events and shutdown + npkitDumpDir = getenv("NPKIT_DUMP_DIR"); + if (npkitDumpDir == nullptr) { + WARN("NPKIT_DUMP_DIR is empty"); + } else { + MSCCLPPCHECK(NpKit::Dump(npkitDumpDir)); + } + MSCCLPPCHECK(NpKit::Shutdown()); +#endif + return mscclppSuccess; } diff --git a/src/misc/npkit.cc b/src/misc/npkit.cc new file mode 100644 index 00000000..4408a4b1 --- /dev/null +++ b/src/misc/npkit.cc @@ -0,0 +1,174 @@ +#include +#include +#include + +#include "alloc.h" +#include "npkit/npkit.h" + +uint64_t NpKit::rank_ = 0; + +NpKitEvent** NpKit::gpu_event_buffers_ = nullptr; +NpKitEvent** NpKit::cpu_event_buffers_ = nullptr; + +NpKitEventCollectContext* NpKit::gpu_collect_contexts_ = nullptr; +NpKitEventCollectContext* NpKit::cpu_collect_contexts_ = nullptr; +uint64_t* NpKit::cpu_timestamp_ = nullptr; + +std::thread* NpKit::cpu_timestamp_update_thread_ = nullptr; +volatile bool NpKit::cpu_timestamp_update_thread_should_stop_ = false; + +void NpKit::CpuTimestampUpdateThread() { + uint64_t init_system_clock = std::chrono::system_clock::now().time_since_epoch().count(); + uint64_t init_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); + uint64_t curr_steady_clock = 0; + volatile uint64_t* volatile_cpu_timestamp_ = cpu_timestamp_; + while (!cpu_timestamp_update_thread_should_stop_) { + curr_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); + *volatile_cpu_timestamp_ = init_system_clock + (curr_steady_clock - init_steady_clock); + } +} + +mscclppResult_t NpKit::Init(int rank) { + uint64_t i = 0; + NpKitEventCollectContext ctx; + ctx.event_buffer_head = 0; + rank_ = rank; + + // Init event data structures + MSCCLPPCHECK(mscclppCalloc(&gpu_event_buffers_, kNumGpuEventBuffers)); + MSCCLPPCHECK(mscclppCudaCalloc(&gpu_collect_contexts_, kNumGpuEventBuffers)); + for (i = 0; i < kNumGpuEventBuffers; i++) { + MSCCLPPCHECK(mscclppCudaCalloc(gpu_event_buffers_ + i, kMaxNumGpuEventsPerBuffer)); + ctx.event_buffer = gpu_event_buffers_[i]; + MSCCLPPCHECK(mscclppCudaMemcpy(gpu_collect_contexts_ + i, &ctx, 1)); + } + + MSCCLPPCHECK(mscclppCalloc(&cpu_event_buffers_, kNumCpuEventBuffers)); + MSCCLPPCHECK(mscclppCalloc(&cpu_collect_contexts_, kNumCpuEventBuffers)); + for (i = 0; i < kNumCpuEventBuffers; i++) { + MSCCLPPCHECK(mscclppCalloc(cpu_event_buffers_ + i, kMaxNumCpuEventsPerBuffer)); + ctx.event_buffer = cpu_event_buffers_[i]; + cpu_collect_contexts_[i] = ctx; + } + + // Init timestamp + MSCCLPPCHECK(mscclppCudaHostCalloc(&cpu_timestamp_, 1)); + volatile uint64_t* volatile_cpu_timestamp = cpu_timestamp_; + *volatile_cpu_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); + cpu_timestamp_update_thread_should_stop_ = false; + cpu_timestamp_update_thread_ = new std::thread(CpuTimestampUpdateThread); + + return mscclppSuccess; +} + +mscclppResult_t NpKit::Dump(const std::string& dump_dir) { + uint64_t i = 0; + std::string dump_file_path; + + // Dump CPU events + for (i = 0; i < kNumCpuEventBuffers; i++) { + dump_file_path = dump_dir; + dump_file_path += "/cpu_events_rank_"; + dump_file_path += std::to_string(rank_); + dump_file_path += "_channel_"; + dump_file_path += std::to_string(i); + auto cpu_trace_file = std::fstream(dump_file_path, std::ios::out | std::ios::binary); + cpu_trace_file.write(reinterpret_cast(cpu_event_buffers_[i]), + cpu_collect_contexts_[i].event_buffer_head * sizeof(NpKitEvent)); + cpu_trace_file.close(); + } + + // Dump CPU clock info + dump_file_path = dump_dir; + dump_file_path += "/cpu_clock_period_num_rank_"; + dump_file_path += std::to_string(rank_); + std::string clock_period_num_str = std::to_string(std::chrono::steady_clock::duration::period::num); + auto clock_period_num_file = std::fstream(dump_file_path, std::ios::out); + clock_period_num_file.write(clock_period_num_str.c_str(), clock_period_num_str.length()); + clock_period_num_file.close(); + + dump_file_path = dump_dir; + dump_file_path += "/cpu_clock_period_den_rank_"; + dump_file_path += std::to_string(rank_); + std::string clock_period_den_str = std::to_string(std::chrono::steady_clock::duration::period::den); + auto clock_period_den_file = std::fstream(dump_file_path, std::ios::out); + clock_period_den_file.write(clock_period_den_str.c_str(), clock_period_den_str.length()); + clock_period_den_file.close(); + + // Dump GPU events, reuse CPU struct + for (i = 0; i < kNumGpuEventBuffers; i++) { + dump_file_path = dump_dir; + dump_file_path += "/gpu_events_rank_"; + dump_file_path += std::to_string(rank_); + dump_file_path += "_buf_"; + dump_file_path += std::to_string(i); + MSCCLPPCHECK(mscclppCudaMemcpy(cpu_event_buffers_[0], gpu_event_buffers_[i], kMaxNumGpuEventsPerBuffer)); + MSCCLPPCHECK(mscclppCudaMemcpy(cpu_collect_contexts_, gpu_collect_contexts_ + i, 1)); + auto gpu_trace_file = std::fstream(dump_file_path, std::ios::out | std::ios::binary); + gpu_trace_file.write(reinterpret_cast(cpu_event_buffers_[0]), + cpu_collect_contexts_[0].event_buffer_head * sizeof(NpKitEvent)); + gpu_trace_file.close(); + } + + // Dump GPU clockRate + dump_file_path = dump_dir; + dump_file_path += "/gpu_clock_rate_rank_"; + dump_file_path += std::to_string(rank_); + cudaDeviceProp dev_prop; + int dev; + CUDACHECK(cudaGetDevice(&dev)); + CUDACHECK(cudaGetDeviceProperties(&dev_prop, dev)); + std::string clock_rate_str = std::to_string(dev_prop.clockRate); + auto gpu_clock_rate_file = std::fstream(dump_file_path, std::ios::out); + gpu_clock_rate_file.write(clock_rate_str.c_str(), clock_rate_str.length()); + gpu_clock_rate_file.close(); + + return mscclppSuccess; +} + +mscclppResult_t NpKit::Shutdown() { + uint64_t i = 0; + + // Stop CPU timestamp updating thread + cpu_timestamp_update_thread_should_stop_ = true; + cpu_timestamp_update_thread_->join(); + + // Free CPU event data structures + for (i = 0; i < kNumCpuEventBuffers; i++) { + free(cpu_event_buffers_[i]); + } + free(cpu_event_buffers_); + free(cpu_collect_contexts_); + + // Free GPU event data structures + for (i = 0; i < kNumGpuEventBuffers; i++) { + CUDACHECK(cudaFree(gpu_event_buffers_[i])); + } + free(gpu_event_buffers_); + CUDACHECK(cudaFree(gpu_collect_contexts_)); + + // Free timestamp + MSCCLPPCHECK(mscclppCudaHostFree(cpu_timestamp_)); + + return mscclppSuccess; +} + +NpKitEventCollectContext* NpKit::GetGpuEventCollectContexts() { + return gpu_collect_contexts_; +} + +void NpKit::CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id) { + uint64_t event_buffer_head = cpu_collect_contexts_[channel_id].event_buffer_head; + if (event_buffer_head < kMaxNumCpuEventsPerBuffer) { + NpKitEvent& event = cpu_collect_contexts_[channel_id].event_buffer[event_buffer_head]; + event.fields.type = type; + event.fields.size = size; + event.fields.rsvd = rsvd; + event.fields.timestamp = timestamp; + cpu_collect_contexts_[channel_id].event_buffer_head++; + } +} + +uint64_t* NpKit::GetCpuTimestamp() { + return cpu_timestamp_; +} \ No newline at end of file diff --git a/src/proxy.cc b/src/proxy.cc index a6bd39b3..31583edd 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -12,6 +12,10 @@ #include #include +#if defined(ENABLE_NPKIT) +#include "npkit/npkit.h" +#endif + #define MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD 100 // TODO(chhwang): verify if MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0 is useful, otherwise delete this option. #define MSCCLPP_PROXY_FLAG_SET_BY_RDMA 1 @@ -224,6 +228,11 @@ void* mscclppProxyServiceIb(void* _args) { conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, /*wrId=*/0, /*srcOffset=*/trigger.fields.srcDataOffset, /*dstOffset=*/trigger.fields.dstDataOffset, /*signaled=*/false); +#if defined(ENABLE_NPKIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_IB_SEND_ENTRY, (uint32_t)trigger.fields.dataSize, 0 /* inflight request differentiator */, + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), trigger.fields.connId /* event collection context index */); +#endif } if (trigger.fields.type & mscclppFlag) { // My local flag is copied to the peer's proxy flag @@ -258,6 +267,11 @@ void* mscclppProxyServiceIb(void* _args) { if (wc->opcode == IBV_WC_RDMA_WRITE) { // send completion waiting = false; +#if defined(ENABLE_NPKIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_IB_SEND_EXIT, (uint32_t)trigger.fields.dataSize, 0 /* inflight request differentiator */, + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), trigger.fields.connId /* event collection context index */); +#endif break; } } diff --git a/tools/npkit/build_and_run_npkit.sh b/tools/npkit/build_and_run_npkit.sh new file mode 100644 index 00000000..0e9cd4eb --- /dev/null +++ b/tools/npkit/build_and_run_npkit.sh @@ -0,0 +1,21 @@ +set -e + +MSCCLPP_SRC_DIR="/mnt/mscclpp" +NPKIT_RUN_DIR="/mnt/npkit_run" +MPI_HOME="/usr/local/mpi" +HOSTFILE="hostfile" +LEADER_IP_PORT="10.6.0.4:50000" + +cd ${MSCCLPP_SRC_DIR} +make clean +MPI_HOME=${MPI_HOME} make -j NPKIT=1 + +parallel-ssh -h ${HOSTFILE} "rm -rf ${NPKIT_RUN_DIR}" +parallel-ssh -h ${HOSTFILE} "mkdir -p ${NPKIT_RUN_DIR}" +parallel-scp -r -h ${HOSTFILE} ${MSCCLPP_SRC_DIR} ${NPKIT_RUN_DIR} +parallel-ssh -h ${HOSTFILE} "mkdir -p ${NPKIT_RUN_DIR}/npkit_dump" +parallel-ssh -h ${HOSTFILE} "mkdir -p ${NPKIT_RUN_DIR}/npkit_trace" + +mpirun --allow-run-as-root -hostfile ${HOSTFILE} -map-by ppr:8:node --bind-to numa -x LD_PRELOAD=${NPKIT_RUN_DIR}/mscclpp/build/lib/libmscclpp.so -x MSCCLPP_DEBUG=WARN -x NPKIT_DUMP_DIR=${NPKIT_RUN_DIR}/npkit_dump ${NPKIT_RUN_DIR}/mscclpp/build/bin/tests/allgather_test -ip_port ${LEADER_IP_PORT} -kernel 0 + +parallel-ssh -h ${HOSTFILE} "cd ${NPKIT_RUN_DIR}/mscclpp/tools/npkit && python npkit_trace_generator.py --npkit_dump_dir ${NPKIT_RUN_DIR}/npkit_dump --npkit_event_header_path ${NPKIT_RUN_DIR}/mscclpp/src/include/npkit/npkit_event.h --output_dir ${NPKIT_RUN_DIR}/npkit_trace" diff --git a/tools/npkit/npkit_trace_generator.py b/tools/npkit/npkit_trace_generator.py new file mode 100644 index 00000000..605636b7 --- /dev/null +++ b/tools/npkit/npkit_trace_generator.py @@ -0,0 +1,228 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import argparse +import os +import json + +from queue import Queue + +def parse_npkit_event_header(npkit_event_header_path): + npkit_event_def = {'id_to_type': {}, 'type_to_id': {}} + with open(npkit_event_header_path, 'r') as f: + lines = [x.strip() for x in f.readlines() if len(x.strip()) != 0] + line_idx = 0 + while line_idx < len(lines): + if lines[line_idx].startswith('#define NPKIT_EVENT_'): + fields = lines[line_idx].split() + if len(fields) == 3: + event_type = fields[1] + event_id = int(fields[2], 0) + npkit_event_def['type_to_id'][event_type] = event_id + npkit_event_def['id_to_type'][event_id] = event_type + line_idx += 1 + return npkit_event_def + +def trim_event_name(event_type): + list_event_type_name = event_type.split("_") + if "NPKIT" in list_event_type_name: + list_event_type_name.remove("NPKIT") + if "EVENT" in list_event_type_name: + list_event_type_name.remove("EVENT") + if "ENTRY" in list_event_type_name: + list_event_type_name.remove("ENTRY") + return "_".join(list_event_type_name) + +def parse_gpu_clock_scale(gpu_clock_file_path): + with open(gpu_clock_file_path, 'r') as f: + freq_in_khz = f.read() + return float(freq_in_khz) * 1e3 / 1e6 + +def parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path): + with open(cpu_clock_num_file_path, 'r') as f: + num = float(f.read()) + with open(cpu_clock_den_file_path, 'r') as f: + den = float(f.read()) + return den / num / 1e6 + +def parse_gpu_event(event_bytes): + return { + 'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False), + 'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False), + 'rsvd': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False), + 'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False) + } + +def parse_cpu_event(event_bytes): + return { + 'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False), + 'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False), + 'slot': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False), + 'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False) + } + +def parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale): + gpu_event_file_path = os.path.join(npkit_dump_dir, 'gpu_events_rank_%d_buf_%d' % (rank, buf_idx)) + raw_event_size = 16 + curr_cpu_base_time = None + curr_gpu_base_time = None + gpu_events = [] + event_type_to_seq = {} + with open(gpu_event_file_path, 'rb') as f: + raw_content = f.read() + raw_content_size = len(raw_content) + raw_content_idx = 0 + while raw_content_idx < raw_content_size: + parsed_gpu_event = parse_gpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size]) + if npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_CPU': + curr_cpu_base_time = parsed_gpu_event['timestamp'] / cpu_clock_scale + curr_gpu_base_time = None + elif npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_GPU': + if curr_gpu_base_time is None: + curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale + else: + if curr_gpu_base_time is None: + curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale + event_type = npkit_event_def['id_to_type'][parsed_gpu_event['id']] + phase = 'B' if event_type.endswith('_ENTRY') else 'E' + gpu_events.append({ + 'ph': phase, + 'ts': curr_cpu_base_time + parsed_gpu_event['timestamp'] / gpu_clock_scale - curr_gpu_base_time, + 'pid': rank, + 'tid': buf_idx + 1 + }) + if phase == 'B': + if event_type not in event_type_to_seq: + event_type_to_seq[event_type] = 0 + gpu_events[-1].update({ + 'name': trim_event_name(event_type), + 'cat': 'GPU', + 'args': { + 'rank': rank, + 'buf_idx': buf_idx, + 'seq': event_type_to_seq[event_type], + 'rsvd_0': parsed_gpu_event['rsvd'], + 'size_0': parsed_gpu_event['size'] + } + }) + event_type_to_seq[event_type] += 1 + else: + gpu_events[-1]['args'] = {'size': parsed_gpu_event['size'], 'rsvd': parsed_gpu_event['rsvd']} + delta_time = gpu_events[-1]['ts'] - gpu_events[-2]['ts'] + gpu_events[-1]['args']['bw (GB/s)'] = gpu_events[-1]['args']['size'] / delta_time / 1e3 + raw_content_idx += raw_event_size + return gpu_events + +def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale): + cpu_event_file_path = os.path.join(npkit_dump_dir, 'cpu_events_rank_%d_channel_%d' % (rank, channel)) + raw_event_size = 16 + cpu_events = [] + event_type_to_seq = {} + + fiber_is_usable = [] + fiber_open_ts = [] + slot_to_fiber_id = {} + channel_shift = 1000 + + with open(cpu_event_file_path, 'rb') as f: + raw_content = f.read() + raw_content_size = len(raw_content) + raw_content_idx = 0 + while raw_content_idx < raw_content_size: + parsed_cpu_event = parse_cpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size]) + event_type = npkit_event_def['id_to_type'][parsed_cpu_event['id']] + phase = 'B' if event_type.endswith('_ENTRY') else 'E' + cpu_events.append({ + 'ph': phase, + 'ts': parsed_cpu_event['timestamp'] / cpu_clock_scale, + 'pid': rank + }) + slot = parsed_cpu_event['slot'] + if phase == 'B': + # Open fiber event + fiber_id = 0 + while fiber_id < len(fiber_is_usable): + if fiber_is_usable[fiber_id]: + break + fiber_id += 1 + if fiber_id == len(fiber_is_usable): + fiber_is_usable.append(True) + fiber_open_ts.append(0.0) + slot_to_fiber_id[slot] = fiber_id + fiber_open_ts[fiber_id] = cpu_events[-1]['ts'] + fiber_is_usable[fiber_id] = False + + if event_type not in event_type_to_seq: + event_type_to_seq[event_type] = 0 + cpu_events[-1].update({ + 'name': trim_event_name(event_type), + 'cat': 'CPU', + 'args': { + 'rank': rank, + 'channel': channel, + 'slot': parsed_cpu_event['slot'], + 'seq': event_type_to_seq[event_type], + 'size_0': parsed_cpu_event['size'] + } + }) + event_type_to_seq[event_type] += 1 + else: + # Close fiber event + fiber_id = slot_to_fiber_id[slot] + slot_to_fiber_id.pop(slot) + last_ts = fiber_open_ts[fiber_id] + fiber_is_usable[fiber_id] = True + + delta_time = max(0.001, cpu_events[-1]['ts'] - last_ts) + cpu_events[-1]['args'] = {'size': parsed_cpu_event['size']} + cpu_events[-1]['args']['bw (GB/s)'] = \ + cpu_events[-1]['args']['size'] / delta_time / 1e3 + + cpu_events[-1]['tid'] = fiber_id + (channel + 1) * channel_shift + + raw_content_idx += raw_event_size + return cpu_events + +def convert_npkit_dump_to_trace(npkit_dump_dir, output_dir, npkit_event_def): + files_in_dump_dir = next(os.walk(npkit_dump_dir))[2] + gpu_event_files = [x for x in files_in_dump_dir if x.startswith('gpu_events_rank_')] + cpu_event_files = [x for x in files_in_dump_dir if x.startswith('cpu_events_rank_')] + + ranks = list(set([int(x.split('_rank_')[1].split('_')[0]) for x in gpu_event_files])) + buf_indices = list(set([int(x.split('_buf_')[1].split('_')[0]) for x in gpu_event_files])) + channels = list(set([int(x.split('_channel_')[1].split('_')[0]) for x in cpu_event_files])) + + trace = {'traceEvents': []} + + for rank in ranks: + cpu_clock_den_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_den_rank_%d' % rank) + cpu_clock_num_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_num_rank_%d' % rank) + cpu_clock_scale = parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path) + + gpu_clock_file_path = os.path.join(npkit_dump_dir, 'gpu_clock_rate_rank_%d' % rank) + gpu_clock_scale = parse_gpu_clock_scale(gpu_clock_file_path) + + for buf_idx in buf_indices: + gpu_events = parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale) + trace['traceEvents'].extend(gpu_events) + + for channel in channels: + cpu_events = parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale) + trace['traceEvents'].extend(cpu_events) + + trace['traceEvents'].sort(key=lambda x : x['ts']) + trace['displayTimeUnit'] = 'ns' + + os.makedirs(output_dir, exist_ok=True) + with open(os.path.join(output_dir, 'npkit_event_trace.json'), 'w') as f: + json.dump(trace, f) + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--npkit_dump_dir', type=str, required=True, help='NPKit dump directory.') + parser.add_argument('--npkit_event_header_path', type=str, required=True, help='Path to npkit_event.h.') + parser.add_argument('--output_dir', type=str, required=True, help='Path to output directory.') + args = parser.parse_args() + + npkit_event_def = parse_npkit_event_header(args.npkit_event_header_path) + convert_npkit_dump_to_trace(args.npkit_dump_dir, args.output_dir, npkit_event_def) \ No newline at end of file