Merge branch 'cpp-api' into saemal/api-extension

This commit is contained in:
Madan Musuvathi
2023-04-13 22:32:38 +00:00
13 changed files with 640 additions and 117 deletions

2
.gitignore vendored
View File

@@ -1,5 +1,7 @@
.vscode/
.hypothesis/
build/
dist/
__pycache__
.*.swp
.idea/

View File

@@ -162,7 +162,10 @@ INCLUDE := -Isrc -Isrc/include
all: build
build: lib tests mscclpp-test
build: lib tests
ifeq ($(USE_MPI_FOR_TESTS), 0)
build += mscclpp-test
endif
lib: $(LIBOBJTARGETS) $(INCTARGETS) $(LIBTARGET)

View File

@@ -3,21 +3,21 @@ cmake_minimum_required(VERSION 3.18...3.22)
find_package(Python 3.9 COMPONENTS Interpreter Development.Module REQUIRED)
if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
set(CMAKE_BUILD_TYPE Release CACHE STRING "Choose the type of build." FORCE)
set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release" "MinSizeRel" "RelWithDebInfo")
endif()
set(CMAKE_BUILD_TYPE Release CACHE STRING "Choose the type of build." FORCE)
set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release" "MinSizeRel" "RelWithDebInfo")
endif ()
# Create CMake targets for all Python components needed by nanobind
if (CMAKE_VERSION VERSION_GREATER_EQUAL 3.26)
find_package(Python 3.8 COMPONENTS Interpreter Development.Module Development.SABIModule REQUIRED)
else()
find_package(Python 3.8 COMPONENTS Interpreter Development.Module REQUIRED)
endif()
find_package(Python 3.8 COMPONENTS Interpreter Development.Module Development.SABIModule REQUIRED)
else ()
find_package(Python 3.8 COMPONENTS Interpreter Development.Module REQUIRED)
endif ()
# Detect the installed nanobind package and import it into CMake
execute_process(
COMMAND "${Python_EXECUTABLE}" -c "import nanobind; print(nanobind.cmake_dir())"
OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE NB_DIR)
COMMAND "${Python_EXECUTABLE}" -c "import nanobind; print(nanobind.cmake_dir())"
OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE NB_DIR)
list(APPEND CMAKE_PREFIX_PATH "${NB_DIR}")
find_package(nanobind CONFIG REQUIRED)
@@ -26,47 +26,36 @@ set(CUDA_DIR "/usr/local/cuda")
set(MSCCLPP_DIR ${CMAKE_CURRENT_LIST_DIR}/../build)
nanobind_add_module(
_py_mscclpp
NOSTRIP
NB_STATIC
src/_py_mscclpp.cpp
_py_mscclpp
NOSTRIP
NB_STATIC
src/_py_mscclpp.cpp
)
target_include_directories(
_py_mscclpp
PUBLIC
${CUDA_DIR}/include
${MSCCLPP_DIR}/include
_py_mscclpp
PUBLIC
${CUDA_DIR}/include
${MSCCLPP_DIR}/include
)
target_link_directories(
_py_mscclpp
PUBLIC
${CUDA_DIR}/lib
${MSCCLPP_DIR}/lib
_py_mscclpp
PUBLIC
${CUDA_DIR}/lib
${MSCCLPP_DIR}/lib
)
target_link_libraries(
_py_mscclpp
PUBLIC
mscclpp
_py_mscclpp
PUBLIC
mscclpp
)
add_custom_target(build-package ALL DEPENDS _py_mscclpp)
add_custom_command(
TARGET build-package POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_directory
${CMAKE_CURRENT_SOURCE_DIR}/src/mscclpp
${CMAKE_CURRENT_BINARY_DIR}/mscclpp)
add_custom_command(
TARGET build-package POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy
$<TARGET_FILE:_py_mscclpp>
${CMAKE_CURRENT_BINARY_DIR}/mscclpp/)
add_custom_command(
TARGET build-package POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy
${MSCCLPP_DIR}/lib/libmscclpp.so
${CMAKE_CURRENT_BINARY_DIR}/mscclpp/)
TARGET build-package POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy
${MSCCLPP_DIR}/lib/libmscclpp.so
${CMAKE_CURRENT_BINARY_DIR})

View File

@@ -1,77 +1,30 @@
# Python bindings
Test instructions:
* Compile the `libmscclpp.so` library.
* Install `cmake` verion >= 3.18
* setup a python virtual env
* `pip install -r requirements.txt`
* `./tesh.sh`
Rough build attemtps
```
# cd to this directory:
* Compile the `libmscclpp.so` library.
* Install `cmake` verion >= 3.18
* setup a python virtual env
* `pip install -r dev-requirements.txt`
* `./tesh.sh`
# setup/enter pyenv environment for python 3.9
## Run CI:
# install nanabind and the test requirements.
pip install -r requirements.txt
# setup and build the CMake environments.
# this requires nanobind, installed above.
./setup.sh
# test the module
pytest build/mscclpp
```bash
./ci.sh
```
## Build a wheel:
## Installing `gdrcopy` and `mpi`
This assumes that some things are built/installed
Setup dev environment, then:
```bash
python setup.py bdist_wheel
```
# assumes WORKDIR has:
# git clone git@github.com/NVIDIA/gdrcopy.git
# git clone git@github.com:microsoft/mscclpp.git
uname -r
# 5.4.0-1090-azure
# install
# break /usr/sbin/policy-rc.d so we can install modules
echo '#!/bin/sh
exit 0' > /usr/sbin/policy-rc.d
apt update
apt install -y \
build-essential devscripts debhelper check \
libsubunit-dev fakeroot pkg-config dkms \
linux-headers-5.4.0-1090-azure
apt install -y nvidia-dkms-525-server
cd $WORKDIR/gdrcopy
sed -i 's/\(-L \$(CUDA)\/lib64\)/\1 \1\/stubs/' tests/Makefile
cd packages
CUDA=/usr/local/cuda ./build-deb-packages.sh
dpkg -i gdrdrv-dkms_2.3-1_amd64.Ubuntu20_04.deb
dpkg -i libgdrapi_2.3-1_amd64.Ubuntu20_04.deb
dpkg -i gdrcopy-tests_2.3-1_amd64.Ubuntu20_04+cuda11.6.deb
dpkg -i gdrcopy_2.3-1_amd64.Ubuntu20_04.deb
# validate:
# $ sanity
# Running suite(s): Sanity
# 100%: Checks: 27, Failures: 0, Errors: 0
# dkms install -m gdrdrv/2.3
cd $WORKDIR/mscclpp
## Installing mpi and numa libs.
```
## numctl
apt install -y numactl libnuma-dev libnuma1
# if not mpi testing
USE_MPI_FOR_TESTS=0 make -j
```

View File

@@ -17,7 +17,7 @@ python -m venv .venv
source .venv/bin/activate
# install venv deps.
pip install -r requirements.txt
pip install -r dev-requirements.txt
# run the build and test.
./test.sh

View File

@@ -9,4 +9,3 @@ PyHamcrest
nanobind
torch
numpy

83
python/setup.py Normal file
View File

@@ -0,0 +1,83 @@
#!/usr/bin/env python
import os
import shutil
import sys
import logging
from setuptools import Extension, find_packages, setup
from setuptools.command.build_ext import build_ext
import subprocess
THIS_DIR = os.path.abspath(os.path.dirname(__file__))
class CustomExt(Extension):
def __init__(self, name):
# don't invoke the original build_ext for this special extension
super().__init__(name, sources=[])
class custom_build_ext(build_ext):
def run(self):
for ext in self.extensions:
if isinstance(ext, CustomExt):
self.build_extension(ext)
else:
super().run()
def build_extension(self, ext):
if sys.platform == "darwin":
return
# these dirs will be created in build_py, so if you don't have
# any python sources to bundle, the dirs will be missing
build_temp = os.path.abspath(self.build_temp)
os.makedirs(build_temp, exist_ok=True)
try:
subprocess.check_output(
["cmake", "-S", THIS_DIR, "-B", build_temp],
stderr=subprocess.STDOUT,
)
subprocess.check_output(
["cmake", "--build", build_temp],
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as e:
logging.error(e.output.decode())
raise
libname = os.path.basename(self.get_ext_fullpath(ext.name))
target_dir = os.path.join(
os.path.dirname(self.get_ext_fullpath(ext.name)),
"mscclpp",
)
shutil.copy(
os.path.join(build_temp, "libmscclpp.so"),
target_dir,
)
shutil.copy(
os.path.join(build_temp, libname),
target_dir,
)
setup(
name='mscclpp',
version='0.1.0',
description='Python bindings for mscclpp',
# packages=['mscclpp'],
package_dir={'': 'src'},
packages=find_packages(where='./src'),
ext_modules=[CustomExt('_py_mscclpp')],
cmdclass={
"build_ext": custom_build_ext,
},
install_requires=[
'torch',
'nanobind',
],
)

View File

@@ -1,6 +0,0 @@
#!/bin/bash
set -ex
cmake -S . -B build
cmake --build build --clean-first -v

View File

@@ -2,11 +2,7 @@
set -ex
if ! [ -d build ] ; then
./setup.sh
fi
pip install -e .
cmake --build build
cd build
pytest -s mscclpp
cd src
pytest -vs mscclpp

2
src/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
# Python in-place installs move the .so files into the source directories.
*.so

51
src/communicator.cc Normal file
View File

@@ -0,0 +1,51 @@
#include "mscclpp.hpp"
#include "mscclpp.h"
namespace mscclpp {
struct Communicator::impl {
mscclppComm_t comm;
};
void Communicator::initRank(int nranks, const char* ipPortPair, int rank) {
}
void Communicator::initRankFromId(int nranks, UniqueId id, int rank) {
}
void Communicator::bootstrapAllGather(void* data, int size) {
}
void Communicator::bootstrapBarrier() {
}
std::shared_ptr<HostConnection> Communicator::connect(int remoteRank, int tag, void* localBuff, uint64_t buffSize,
TransportType transportType, const char* ibDev = 0) {
}
void Communicator::connectionSetup() {
}
void Communicator::destroy() {
}
int Communicator::rank() {
}
int Communicator::size() {
}
void Communicator::setBootstrapConnTimeout(unsigned timeout) {
}
} // namespace mscclpp

398
src/include/mscclpp.hpp Normal file
View File

@@ -0,0 +1,398 @@
#ifndef MSCCLPP_HPP_
#define MSCCLPP_HPP_
#define MSCCLPP_MAJOR 0
#define MSCCLPP_MINOR 1
#define MSCCLPP_PATCH 0
#define MSCCLPP_VERSION (MSCCLPP_MAJOR * 10000 + MSCCLPP_MINOR * 100 + MSCCLPP_PATCH)
// For every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER, a flush of the tail to device memory is triggered.
// As long as MSCCLPP_PROXY_FIFO_SIZE is large enough, having a stale tail is not a problem.
#define MSCCLPP_PROXY_FIFO_SIZE 128
#define MSCCLPP_PROXY_FIFO_FLUSH_COUNTER 4
#include <vector>
#include <memory>
#include <mscclppfifo.hpp>
namespace mscclpp {
struct alignas(16) SignalEpochId {
// every signal(), increaments this and either:
// 1) proxy thread pushes it to the remote peer's localSignalEpochId->proxy
// 2) gpu thread directly writes it to remoteSignalEpochId->device
uint64_t device;
// signal() function triggers the cpu proxy thread to write to it
uint64_t proxy;
};
enum ChannelTriggerType : uint64_t {
channelTriggerData = 0x1,
channelTriggerFlag = 0x2,
channelTriggerSync = 0x4
};
// This is just a numeric ID. Each HostConnection will have an internal array indexed by these handles
// mapping to the actual
using BufferHandle = uint8_t;
#define MSCCLPP_BITS_SIZE 32
#define MSCCLPP_BITS_OFFSET 32
#define MSCCLPP_BITS_BUFFER_HANDLE 8
#define MSCCLPP_BITS_TYPE 3
#define MSCCLPP_BITS_CONNID 10
// this is the basic structure of each work element in the fifo
// the summation of number of bits must be 128 or less
union ChannelTrigger {
ProxyTrigger value;
struct
{
// first 64 bits: value[0]
uint64_t size : MSCCLPP_BITS_SIZE;
uint64_t srcOffset : MSCCLPP_BITS_OFFSET;
uint64_t : (64 - MSCCLPP_BITS_SIZE - MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment
// second 64 bits: value[1]
uint64_t dstOffset : MSCCLPP_BITS_OFFSET;
uint64_t srcBufferHandle : MSCCLPP_BITS_BUFFER_HANDLE;
uint64_t dstBufferHandle : MSCCLPP_BITS_BUFFER_HANDLE;
uint64_t type : MSCCLPP_BITS_TYPE;
uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_BUFFER_HANDLE - MSCCLPP_BITS_BUFFER_HANDLE - MSCCLPP_BITS_TYPE); // ensure 64-bit alignment
} fields;
ChannelTrigger() {}
ChannelTrigger(ProxyTrigger value) : value(value) {}
ChannelTrigger(ChannelTriggerType type, BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size) {
value.fst = ((srcOffset << MSCCLPP_BITS_SIZE) + size);
value.snd = (((((((uint64_t)type << MSCCLPP_BITS_BUFFER_HANDLE) + dst) << MSCCLPP_BITS_BUFFER_HANDLE) + src) << MSCCLPP_BITS_OFFSET) + dstOffset);
}
};
/***************************************************************************************************************
* A mscclppDevConn provides a zero-copy connection between two GPUs connected via P2P NVLink or InfiniBand.
* The communication API is one-sided meaning that for every single data transfer, only one side
* needs to execute unlike a two-sided communication stack such as NCCL where both sides
* need to execute a send and a receive instruction, respectively, for every transfer.
*
* A connection is uniquely identified by the (remoteRank, tag) pair at an endpoint.
* The two endpoints register buffers of the same size with the connection.
*
* The endpoints provide the remoteRank, tag, and the buffer when registering a connection with msccppConnect().
*
* mscllppConnectionSetup() sets up all the registered connections.
*
***************************************************************************************************************
* A proxy thread running on the CPU is necessary to perform transfers using InfiniBand or the DMA engine.
* The current implementation uses a single proxy thread per context - one IB connection or DMA engine per node.
* Thus multiple threadblocks using different connections might use the same CPU proxy thread.
*
* Before using any of functionality of connections, mscclppProxyLaunch needs to be called to spawn the
* proxy threads. There are currently two types of connections:
*
* P2P via NVLink: the DMA engine can perform the copy between the buffers. DMA engine has higher latency
* but has a higher bandwidth and costs no compute cycles on the GPU.
*
* InfiniBand: the RDMA engine copies the data over MLX devices.
*
***************************************************************************************************************
* At the runtime, a GPU kernel has access to a mscclppDevConn object that provides the following functions:
*
* put(): [non-blocking] the sender initiates a data transfer to the receiver.
*
* signal(): [non-blocking] the sender signals the receiver that data is ready to be consumed.
*
* flush(): [blocking] the sender waits for all the data transfers to complete
*
* wait(): [blocking] the reciever waits on the signal() to start reading the data.
*
* The sender should not reuse the buffer till the flush() returns.
* The receiver should only access the data after the wait() returns.
*
* putWithSignal(): the sender initiates a data transfer and signals the receiver that data is ready to be consumed.
* This is an optimized version of a put() followed by a signal().
*
* These functions hide the complexity of syncrhonization between the two GPUs and the CPU proxy thread.
* Example:
*
* // sender GPU
* devConn.put(data1)
* // not OK to write to data1
* devConn.put(data2)
* // not OK to write to data1, data2
* devConn.put(data3) // receiver GPU
* // not OK to write to data1, data2, data3 // not OK to read data1, data2, data3
* devConn.signal() -------------------------------> devConn.wait()
* // not OK to write to data1, data2, data3 // OK to read data1, data2, data3
* devConn.flush()
* // OK to write to data1, data2, data3
*
*
* The two endpoint can concurrently use the same connection provided they are writing (puts) on different
* indices in the registered buffer.
**************************************************************************************************************/
struct DeviceConnection {
#ifdef __CUDACC__
// TODO: add buffer handles
__forceinline__ __device__ void put(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size)
{
fifo.push(ChannelTrigger(channelTriggerData, dst, dstOffset, src, srcOffset, size).value);
}
__forceinline__ __device__ void put(BufferHandle dst, BufferHandle src, uint64_t offset, uint64_t size)
{
put(dst, offset, src, offset, size);
}
__forceinline__ __device__ void signal()
{
epochIncrement();
fifo.push(ChannelTrigger(channelTriggerFlag, 0, 0, 0, 0, 1).value);
}
__forceinline__ __device__ void putWithSignal(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size)
{
epochIncrement();
fifo.push(ChannelTrigger(channelTriggerData | channelTriggerFlag, dst, dstOffset, src, srcOffset, size).value);
}
__forceinline__ __device__ void putWithSignal(BufferHandle dst, BufferHandle src, uint64_t offset, uint64_t size)
{
putWithSignal(dst, offset, src, offset, size);
}
__forceinline__ __device__ void putWithSignalAndFlush(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size)
{
epochIncrement();
uint64_t curFifoHead = fifo.push(channelTriggerData | channelTriggerFlag | channelTriggerSync, dstOffset, srcOffset, size);
while (*(volatile uint64_t*)&fifo.triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 &&
*(volatile uint64_t*)fifo.triggerFifoTail <= curFifoHead)
;
}
__forceinline__ __device__ void putWithSignalAndFlush(BufferHandle dst, BufferHandle src, uint64_t offset, uint64_t size)
{
putWithSignalAndFlush(offset, offset, size);
}
__forceinline__ __device__ void flush()
{
uint64_t curFifoHead = fifo.push(mscclppSync, 0, 0, 1);
// we need to wait for two conditions to be met to ensure the CPU is done flushing. (1) wait for the tail
// to go pass by curFifoHead (this is safety net) and (2) wait for the work element value to change to 0.
while (*(volatile uint64_t*)&fifo.triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 &&
*(volatile uint64_t*)fifo.triggerFifoTail <= curFifoHead)
;
}
__forceinline__ __device__ void wait()
{
(*waitEpochId) += 1;
while (*(volatile uint64_t*)&(localSignalEpochId->proxy) < (*waitEpochId))
;
}
__forceinline__ __device__ void epochIncrement()
{
*(volatile uint64_t*)&(localSignalEpochId->device) += 1;
}
#endif // __CUDACC__
int remoteRank;
int tag;
SignalEpochId* localSignalEpochId;
// used by the signal() function directly from gpu
SignalEpochId* remoteSignalEpochId;
// every wait(), increments this and then the gpu waits for either:
// 1) localSignalEpochId->proxy to be >= this in case of a proxy thread
// 2) remoteSignalEpochId->device to be >= this in case of a gpu thread
uint64_t* waitEpochId;
// this is a concurrent fifo which is multiple threads from the device
// can produce for and the sole proxy thread consumes it.
ProxyFifo fifo;
};
class HostConnection {
public:
/* Register a region of GPU memory for use with this connection. Must be called before connectionSetup()
* in the communicator.
*
* Inputs:
* data: base pointer to the memory
* size: size of the memory region in bytes
*
* Returns: a handle to the buffer
*/
BufferHandle registerBuffer(void* data, uint64_t size);
/* Get the number of times registerBuffer(...) was called on the remote peer.
*
* Returns: the number of buffers registered on the remote peer
*/
int numRemoteBuffers();
/* Get the BufferHandle returned by a call to registerBuffer(...) on the remote peer as identified by the index
*
* Inputs:
* index: the index of the handle to get
*
* Returns: a handle to the buffer on the remote peer
*/
BufferHandle getRemoteBuffer(int index);
/* Create a DeviceConnection paired with this HostConnection. A background proxy thread will
* trigger operations on this HostConnection corresponding to put/signal/etc. calls made to the
* DeviceConnection.
*
* Inputs:
* startProxyThread: whether to start the proxy thread (default is true)
*
* Returns: the newly created DeviceConnection
*/
DeviceConnection toDevice(bool startProxyThread = true);
void put(BufferHandle dst, uint64_t dstOffset, BufferHandle src, uint64_t srcOffset, uint64_t size);
void put(BufferHandle dst, BufferHandle src, uint64_t offset, uint64_t size);
void signal();
void flush();
void wait();
void epochIncrement();
private:
struct impl;
std::unique_ptr<impl> pimpl;
};
#define MSCCLPP_UNIQUE_ID_BYTES 128
struct UniqueId {
char internal[MSCCLPP_UNIQUE_ID_BYTES];
};
/* Create a unique ID for communication. Only needs to be called by one process.
* Use with mscclppCommInitRankFromId().
* All processes need to provide the same ID to mscclppCommInitRankFromId().
*
* Outputs:
* uniqueId: the unique ID to be created
*/
std::unique_ptr<UniqueId> getUniqueId();
/* Transport Types */
enum class TransportType : uint8_t {
P2P = 0,
IB = 1,
};
class Communicator {
public:
/* Initialize the communicator. nranks processes with rank 0 to nranks-1 need to call this function.
*
* Inputs:
* nranks: number of ranks in the communicator
* ipPortPair: a string of the form "ip:port" that represents the address of the root process
* rank: rank of the calling process
*/
void initRank(int nranks, const char* ipPortPair, int rank);
/* Initialize the communicator from a given UniqueId. Same as mscclppCommInitRank() except that
* id is provided by the user by calling getUniqueId()
*
* Inputs:
* nranks: number of ranks in the communicator
* id: the unique ID to be used for communication
* rank: rank of the calling process
*/
void initRankFromId(int nranks, UniqueId id, int rank);
/* Ring-based AllGather through the bootstrap socket.
*
* Inputs:
* data: data array to be gathered where `[r*size, (r+1)*size)` is the data for rank `r`
* size: data size per rank
*/
void bootstrapAllGather(void* data, int size);
/* A no-op function that is used to synchronize all processes via a bootstrap allgather*/
void bootstrapBarrier();
/* Connect to a remote rank. This function only prepares metadata for connection. The actual connection
* is made by a following call of mscclppConnectionSetup(). Note that this function is two-way and a connection
* from rank i to remote rank j needs to have a counterpart from rank j to rank i.
* Note that with IB, buffers are registered at a page level and if a buffer is spread through multiple pages
* and do not fully utilize all of them, IB's QP has to register for all involved pages. This potentially has
* security risks if the devConn's accesses are given to a malicious process.
*
* Inputs:
* remoteRank: the rank of the remote process
* tag: the tag of the connection. tag is copied into the corresponding mscclppDevConn_t, which can be
* used to identify the connection inside a GPU kernel.
* localBuff: the local send/receive buffer
* buffSize: the size of the local buffer
* transportType: the type of transport to be used (mscclppTransportP2P or mscclppTransportIB)
* ibDev: the name of the IB device to be used. Expects a null for mscclppTransportP2P.
*/
std::shared_ptr<HostConnection> connect(int remoteRank, int tag, void* localBuff, uint64_t buffSize,
TransportType transportType, const char* ibDev = 0);
/* Establish all connections created by mscclppConnect(). This function must be called after all mscclppConnect()
* calls are made. This function ensures that all remote ranks are ready to communicate when it returns.
*/
void connectionSetup();
/* Destroy the communicator. */
void destroy();
/* Return the rank of the calling process.
*
* Outputs:
* rank: the rank of the calling process
*/
int rank();
/* Return the number of ranks of the communicator.
*
* Outputs:
* size: the number of ranks of the communicator
*/
int size();
/* Set the timeout for the bootstrap connection.
*
* Inputs:
* timeout: the timeout in seconds
*/
void setBootstrapConnTimeout(unsigned timeout);
private:
struct impl;
std::unique_ptr<impl> pimpl;
};
/* Log handler type which is a callback function for
* however user likes to handle the log messages. Once set,
* the logger will just call this function with msg.
*/
typedef void (*LogHandler)(const char* msg);
/* The default log handler.
*
* Inputs:
* msg: the log message
*/
void defaultLogHandler(const char* msg);
/* Set a custom log handler.
*
* Inputs:
* handler: the log handler function
*/
void setLogHandler(LogHandler handler);
} // namespace mscclpp
#endif // MSCCLPP_H_

View File

@@ -0,0 +1,53 @@
#ifndef MSCCLPPFIFO_H_
#define MSCCLPPFIFO_H_
#include <stdint.h>
#include <functional>
namespace mscclpp {
struct alignas(16) ProxyTrigger {
uint64_t fst, snd;
};
/* This is a concurrent fifo where multiple device threads can push mscclppTrigger work elements to
* and a single host proxy thread consumes these work elements. There is a head pointer allocated on device
* which starts with 0 and goes to 2^64-1 which is almost infinity. There are two copies of tail, one
* that is on the deivce (triggerFifoTail) and another that is on host (proxyState->fifoTailHost).
* The host always has the "true" tail and occasionally, pushes it to the copy on the device.
* Therefore, most of the time, the device has a stale version. The invariants are:
* triggerFifoTail <= proxyState->fifoTailHost <= triggerFifoHead.
* push() function increments triggerFifoHead, proxyState->fifoTailHost is updated in proxy.cc:mscclppProxyService
* and it occasionally flushes it to triggerFifoTail via a cudaMemcpyAsync.
*
* Why duplicating the tail is a good idea? The fifo is large engouh and we do not need frequent updates
* for the tail as there is usually enough space for device threads to push their work into.
*/
struct ProxyFifo {
#ifdef __CUDACC__
__forceinline__ __device__ uint64_t push(ProxyTrigger element)
{
uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->triggerFifoHead, 1);
while (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->triggerFifoTail))
;
while (*(volatile uint64_t*)&this->triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0)
;
uint64_t* valptr = (uint64_t*)&(this->triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE].value);
asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(valptr),
"l"(element.value[0]), "l"(element.value[1]));
return curFifoHead;
}
#endif // __CUDACC__
void startProxyThread(std::function<void(ProxyTrigger)> handler);
void stopProxyThread();
ProxyTrigger* triggerFifo; // Allocate on host via cudaHostAlloc. This space is used for pushing the workelements
uint64_t* triggerFifoTail; // Allocated on device. proxyState->fifoTailHost is the true tail on host and pused
// occasionally to device
uint64_t* triggerFifoHead; // Allocated on device. Only accessed by device
};
} // namespace mscclpp
#endif // MSCCLPPFIFO_H_