Files
mscclpp/src/connection.cc
Changho Hwang 960a8ddebd Add a new logger (#668)
* Add `logger.hpp` that will gradually replace `debug.h`
* Minor fixes in `ib.cc`

---------

Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: chhwang <8018170+chhwang@users.noreply.github.com>
2025-11-04 10:32:46 -08:00

470 lines
19 KiB
C++

// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include "connection.hpp"
#if defined(ENABLE_NPKIT)
#include <mscclpp/npkit/npkit.hpp>
#endif
#include <mscclpp/env.hpp>
#include <mscclpp/utils.hpp>
#include <sstream>
#include <thread>
#include "api.h"
#include "context.hpp"
#include "endpoint.hpp"
#include "logger.hpp"
namespace mscclpp {
static void validateTransport(RegisteredMemory mem, Transport transport, uint64_t offset = 0, uint64_t size = 0) {
if (!mem.transports().has(transport)) {
throw Error("RegisteredMemory does not support this transport", ErrorCode::InvalidUsage);
}
if (offset + size > mem.size()) {
throw Error("RegisteredMemory out of bounds", ErrorCode::InvalidUsage);
}
}
static bool isSameProcess(const Endpoint& a, const Endpoint& b) {
return a.hostHash() == b.hostHash() && a.pidHash() == b.pidHash();
}
// Connection
const Endpoint::Impl& Connection::getImpl(const Endpoint& endpoint) { return *(endpoint.pimpl_); }
const RegisteredMemory::Impl& Connection::getImpl(const RegisteredMemory& memory) { return *(memory.pimpl_); }
Context::Impl& Connection::getImpl(Context& context) { return *(context.pimpl_); }
MSCCLPP_API_CPP Connection::Connection(std::shared_ptr<Context> context, const Endpoint& localEndpoint)
: context_(context), localEndpoint_(localEndpoint), maxWriteQueueSize_(localEndpoint.maxWriteQueueSize()) {}
MSCCLPP_API_CPP std::shared_ptr<Context> Connection::context() const { return context_; }
MSCCLPP_API_CPP const Device& Connection::localDevice() const { return localEndpoint_.device(); }
MSCCLPP_API_CPP int Connection::getMaxWriteQueueSize() const { return maxWriteQueueSize_; }
// CudaIpcConnection
CudaIpcConnection::CudaIpcConnection(std::shared_ptr<Context> context, const Endpoint& localEndpoint,
const Endpoint& remoteEndpoint)
: Connection(context, localEndpoint) {
if (localEndpoint.transport() != Transport::CudaIpc || remoteEndpoint.transport() != Transport::CudaIpc) {
throw Error("CudaIpc transport is required for CudaIpcConnection", ErrorCode::InternalError);
}
if (localEndpoint.device().type == DeviceType::GPU && localEndpoint.device().id < 0) {
throw Error("No GPU device ID provided for local endpoint", ErrorCode::InternalError);
}
if (remoteEndpoint.device().type == DeviceType::GPU && remoteEndpoint.device().id < 0) {
throw Error("No GPU device ID provided for remote endpoint", ErrorCode::InternalError);
}
int localDeviceId = localEndpoint.device().id;
int remoteDeviceId = remoteEndpoint.device().id;
if (localEndpoint.device().type != DeviceType::GPU && remoteEndpoint.device().type != DeviceType::GPU) {
throw Error("CudaIpcConnection requires at least one GPU endpoint", ErrorCode::InvalidUsage);
} else if (localEndpoint.device().type == DeviceType::GPU && remoteEndpoint.device().type == DeviceType::GPU) {
if (isSameProcess(localEndpoint, remoteEndpoint) && localDeviceId != remoteDeviceId) {
// Connecting two GPUs in the same process - need to enable peer access explicitly
int originalDeviceId;
MSCCLPP_CUDATHROW(cudaGetDevice(&originalDeviceId));
if (originalDeviceId != localDeviceId) {
MSCCLPP_CUDATHROW(cudaSetDevice(localDeviceId));
}
auto ret = cudaDeviceEnablePeerAccess(remoteDeviceId, 0);
if (ret != cudaSuccess && ret != cudaErrorPeerAccessAlreadyEnabled) {
MSCCLPP_CUDATHROW(ret);
}
if (originalDeviceId != localDeviceId) {
MSCCLPP_CUDATHROW(cudaSetDevice(originalDeviceId));
}
}
}
int streamDeviceId = (localEndpoint.device().type == DeviceType::GPU) ? localDeviceId : remoteDeviceId;
auto& ctxImpl = getImpl(*context);
#if defined(MSCCLPP_DEVICE_HIP)
ctxImpl.ipcStreams_.emplace_back(std::make_shared<CudaIpcStream>(streamDeviceId));
#else // !defined(MSCCLPP_DEVICE_HIP)
if (ctxImpl.ipcStreams_.empty()) {
ctxImpl.ipcStreams_.emplace_back(std::make_shared<CudaIpcStream>(streamDeviceId));
}
#endif // !defined(MSCCLPP_DEVICE_HIP)
stream_ = ctxImpl.ipcStreams_.back();
}
Transport CudaIpcConnection::transport() const { return Transport::CudaIpc; }
Transport CudaIpcConnection::remoteTransport() const { return Transport::CudaIpc; }
void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
validateTransport(dst, remoteTransport(), dstOffset, size);
validateTransport(src, transport(), srcOffset, size);
char* dstPtr = (char*)dst.data();
char* srcPtr = (char*)src.data();
stream_->memcpyD2D(dstPtr + dstOffset, srcPtr + srcOffset, size);
INFO(CONN, "CudaIpcConnection write: from ", srcPtr + srcOffset, " to ", dstPtr + dstOffset, ", size ", size);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}
void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
validateTransport(dst, remoteTransport());
uint64_t oldValue = *src;
*src = newValue;
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.data()) + dstOffset);
stream_->memcpyH2D(dstPtr + dstOffset, src, sizeof(uint64_t));
INFO(CONN, "CudaIpcConnection atomic write: from ", src, " to ", dstPtr + dstOffset, ", ", oldValue, " -> ",
newValue);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}
void CudaIpcConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
if (timeoutUsec >= 0) {
INFO(CONN, "CudaIpcConnection flush: timeout is not supported, ignored");
}
stream_->sync();
INFO(CONN, "CudaIpcConnection flushing connection");
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}
// IBConnection
IBConnection::IBConnection(std::shared_ptr<Context> context, const Endpoint& localEndpoint,
const Endpoint& remoteEndpoint)
: Connection(context, localEndpoint),
transport_(localEndpoint.transport()),
remoteTransport_(remoteEndpoint.transport()),
dummyAtomicSource_(std::make_unique<uint64_t>(0)) {
qp_ = getImpl(localEndpoint).ibQp_;
qp_.lock()->rtr(getImpl(remoteEndpoint).ibQpInfo_);
qp_.lock()->rts();
dummyAtomicSourceMem_ = context->registerMemory(dummyAtomicSource_.get(), sizeof(uint64_t), transport_);
validateTransport(dummyAtomicSourceMem_, transport_);
dstTransportInfo_ = getImpl(dummyAtomicSourceMem_).getTransportInfo(transport_);
INFO(CONN, "IBConnection via ", getIBDeviceName(transport_), " created");
}
Transport IBConnection::transport() const { return transport_; }
Transport IBConnection::remoteTransport() const { return remoteTransport_; }
void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_WRITE_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
validateTransport(dst, remoteTransport(), dstOffset, size);
validateTransport(src, transport(), srcOffset, size);
auto dstTransportInfo = getImpl(dst).getTransportInfo(remoteTransport());
if (dstTransportInfo.ibLocal) {
throw Error("dst is local, which is not supported", ErrorCode::InvalidUsage);
}
auto srcTransportInfo = getImpl(src).getTransportInfo(transport());
if (!srcTransportInfo.ibLocal) {
throw Error("src is remote, which is not supported", ErrorCode::InvalidUsage);
}
auto dstMrInfo = dstTransportInfo.ibMrInfo;
auto srcMr = srcTransportInfo.ibMr;
qp_.lock()->stageSend(srcMr, dstMrInfo, (uint32_t)size, /*wrId=*/0, /*srcOffset=*/srcOffset, /*dstOffset=*/dstOffset,
/*signaled=*/true);
qp_.lock()->postSend();
INFO(CONN, "IBConnection write: from ", (uint8_t*)srcMr->getBuff() + srcOffset, " to ",
(uint8_t*)dstMrInfo.addr + dstOffset, ", size ", size);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_WRITE_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}
void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
validateTransport(dst, remoteTransport());
auto dstTransportInfo = getImpl(dst).getTransportInfo(remoteTransport());
if (dstTransportInfo.ibLocal) {
throw Error("dst is local, which is not supported", ErrorCode::InvalidUsage);
}
auto dstMrInfo = dstTransportInfo.ibMrInfo;
// assert that src is on host
uint64_t oldValue = *src;
*src = newValue;
qp_.lock()->stageAtomicAdd(dstTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset, newValue - oldValue,
/*signaled=*/true);
qp_.lock()->postSend();
INFO(CONN, "IBConnection atomic Write: from ", src, " to ", (uint8_t*)dstMrInfo.addr + dstOffset, ", ", oldValue,
" -> ", newValue);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}
void IBConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
Timer timer;
while (qp_.lock()->getNumCqItems()) {
int wcNum = qp_.lock()->pollCq();
if (wcNum < 0) {
throw mscclpp::IbError("pollCq failed: error no " + std::to_string(errno), errno);
} else if (timeoutUsec >= 0) {
auto elapsed = timer.elapsed();
if (elapsed > timeoutUsec) {
throw Error("pollCq timed out: waited for " + std::to_string(elapsed / 1e6) + " seconds. Expected " +
std::to_string(qp_.lock()->getNumCqItems()) + " signals",
ErrorCode::Timeout);
}
}
for (int i = 0; i < wcNum; ++i) {
int status = qp_.lock()->getWcStatus(i);
if (status != static_cast<int>(WsStatus::Success)) {
throw mscclpp::IbError("a work item failed: status " + std::to_string(status), status);
}
}
}
INFO(CONN, "IBConnection flushing connection");
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}
// EthernetConnection
EthernetConnection::EthernetConnection(std::shared_ptr<Context> context, const Endpoint& localEndpoint,
const Endpoint& remoteEndpoint, uint64_t sendBufferSize, uint64_t recvBufferSize)
: Connection(context, localEndpoint),
abortFlag_(0),
sendBufferSize_(sendBufferSize),
recvBufferSize_(recvBufferSize) {
// Validating Transport Protocol
if (localEndpoint.transport() != Transport::Ethernet || remoteEndpoint.transport() != Transport::Ethernet) {
throw Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage);
}
// Instanciating Buffers
sendBuffer_.resize(sendBufferSize_);
recvBuffer_.resize(recvBufferSize_);
// Creating Thread to Accept the Connection
auto parameter = getImpl(localEndpoint).socket_.get();
std::thread t([this, parameter]() {
recvSocket_ = std::make_unique<Socket>(nullptr, MSCCLPP_SOCKET_MAGIC, SocketTypeUnknown, abortFlag_);
recvSocket_->accept(parameter);
});
// Starting Connection
sendSocket_ = std::make_unique<Socket>(&(getImpl(remoteEndpoint).socketAddress_), MSCCLPP_SOCKET_MAGIC,
SocketTypeBootstrap, abortFlag_);
sendSocket_->connect();
// Ensure the Connection was Established
t.join();
// Starting Thread to Receive Messages
int deviceId = -1;
MSCCLPP_CUDATHROW(cudaGetDevice(&deviceId));
threadRecvMessages_ = std::thread([deviceId, this]() {
MSCCLPP_CUDATHROW(cudaSetDevice(deviceId));
this->recvMessages();
});
INFO(CONN, "Ethernet connection created");
}
EthernetConnection::~EthernetConnection() {
sendSocket_->close();
recvSocket_->close();
threadRecvMessages_.join();
}
Transport EthernetConnection::transport() const { return Transport::Ethernet; }
Transport EthernetConnection::remoteTransport() const { return Transport::Ethernet; }
void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_WRITE_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
// Validating Transport Protocol
validateTransport(dst, remoteTransport(), dstOffset, size);
validateTransport(src, transport(), srcOffset, size);
// Initializing Variables
char* srcPtr = reinterpret_cast<char*>(src.data()) + srcOffset / sizeof(char);
char* dstPtr = reinterpret_cast<char*>(dst.originalDataPtr()) + dstOffset / sizeof(char);
uint64_t sentDataSize = 0;
uint64_t headerSize = 0;
// Copying Meta Data to Send Buffer
char* dstPtrBytes = reinterpret_cast<char*>(&dstPtr);
std::copy(dstPtrBytes, dstPtrBytes + sizeof(dstPtr), sendBuffer_.data() + headerSize / sizeof(char));
headerSize += sizeof(dstPtr);
char* sizeBytes = reinterpret_cast<char*>(&size);
std::copy(sizeBytes, sizeBytes + sizeof(size), sendBuffer_.data() + headerSize / sizeof(char));
headerSize += sizeof(size);
// Getting Data From GPU and Sending Message
while (sentDataSize < size) {
uint64_t dataSize =
std::min(sendBufferSize_ - headerSize / sizeof(char), (size - sentDataSize) / sizeof(char)) * sizeof(char);
uint64_t messageSize = dataSize + headerSize;
mscclpp::gpuMemcpy(sendBuffer_.data() + headerSize / sizeof(char), srcPtr + (sentDataSize / sizeof(char)), dataSize,
cudaMemcpyDeviceToHost);
sendSocket_->send(sendBuffer_.data(), messageSize);
sentDataSize += messageSize;
headerSize = 0;
}
INFO(CONN, "EthernetConnection write: from ", srcPtr, " to ", dstPtr, ", size ", size);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_WRITE_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}
void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
// Validating Transport Protocol
validateTransport(dst, remoteTransport());
// Initializing Variables
uint64_t oldValue = *src;
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.originalDataPtr()) + dstOffset);
uint64_t dataSize = sizeof(uint64_t);
uint64_t messageSize = 0;
*src = newValue;
// Copying Data to Send Buffer
char* dstPtrBytes = reinterpret_cast<char*>(&dstPtr);
std::copy(dstPtrBytes, dstPtrBytes + sizeof(dstPtr), sendBuffer_.data() + messageSize / sizeof(char));
messageSize += sizeof(dstPtr);
char* sizeBytes = reinterpret_cast<char*>(&dataSize);
std::copy(sizeBytes, sizeBytes + sizeof(dataSize), sendBuffer_.data() + messageSize / sizeof(char));
messageSize += sizeof(dataSize);
char* dataBytes = reinterpret_cast<char*>(src);
std::copy(dataBytes, dataBytes + dataSize, sendBuffer_.data() + messageSize / sizeof(char));
messageSize += dataSize;
// Sending Message
sendSocket_->send(sendBuffer_.data(), messageSize);
INFO(CONN, "EthernetConnection atomic write: from ", src, " to ", dstPtr + dstOffset, ", ", oldValue, " -> ",
newValue);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}
void EthernetConnection::flush(int64_t) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
INFO(CONN, "EthernetConnection flushing connection");
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}
void EthernetConnection::recvMessages() {
// Declarating Variables
char* ptr;
uint64_t size;
uint64_t recvSize;
int closed = 0;
bool received = true;
// Receiving Messages Until Connection is Closed
while (recvSocket_->getState() != SocketStateClosed) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 1);
#endif
// Receiving Data Address
if (closed == 0) recvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed);
received &= !closed;
// Receiving data size
if (closed == 0) recvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed);
received &= !closed;
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif
// Receiving Data and Copying Data yo GPU
recvSize = 0;
while (recvSize < size && closed == 0) {
uint64_t messageSize = std::min(recvBufferSize_, (size - recvSize) / sizeof(char)) * sizeof(char);
recvSocket_->recvUntilEnd(recvBuffer_.data(), messageSize, &closed);
received &= !closed;
if (received)
mscclpp::gpuMemcpy(ptr + (recvSize / sizeof(char)), recvBuffer_.data(), messageSize, cudaMemcpyHostToDevice);
recvSize += messageSize;
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif
}
}
} // namespace mscclpp