diff --git a/.gitignore b/.gitignore index e524d792..5d5eff88 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ .vscode/ build/ +__pycache__ +.*.swp +.idea/ diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 00000000..1a1abdb2 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1,2 @@ +.*.swp +.venv/ diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt new file mode 100644 index 00000000..f8798f66 --- /dev/null +++ b/python/CMakeLists.txt @@ -0,0 +1,72 @@ +project(mscclpp) +cmake_minimum_required(VERSION 3.18...3.22) +find_package(Python 3.9 COMPONENTS Interpreter Development.Module REQUIRED) + +if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES) + set(CMAKE_BUILD_TYPE Release CACHE STRING "Choose the type of build." FORCE) + set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release" "MinSizeRel" "RelWithDebInfo") +endif() + +# Create CMake targets for all Python components needed by nanobind +if (CMAKE_VERSION VERSION_GREATER_EQUAL 3.26) + find_package(Python 3.8 COMPONENTS Interpreter Development.Module Development.SABIModule REQUIRED) +else() + find_package(Python 3.8 COMPONENTS Interpreter Development.Module REQUIRED) +endif() + +# Detect the installed nanobind package and import it into CMake +execute_process( + COMMAND "${Python_EXECUTABLE}" -c "import nanobind; print(nanobind.cmake_dir())" + OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE NB_DIR) +list(APPEND CMAKE_PREFIX_PATH "${NB_DIR}") +find_package(nanobind CONFIG REQUIRED) + +set(CUDA_DIR "/usr/local/cuda") + +set(MSCCLPP_DIR ${CMAKE_CURRENT_LIST_DIR}/../build) + +nanobind_add_module( + _py_mscclpp + NOSTRIP + NB_STATIC + src/_py_mscclpp.cpp +) + +target_include_directories( + _py_mscclpp + PUBLIC + ${CUDA_DIR}/include + ${MSCCLPP_DIR}/include +) +target_link_directories( + _py_mscclpp + PUBLIC + ${CUDA_DIR}/lib + ${MSCCLPP_DIR}/lib +) + +target_link_libraries( + _py_mscclpp + PUBLIC + mscclpp +) + +add_custom_target(build-package ALL DEPENDS _py_mscclpp) +add_custom_command( + TARGET build-package POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_directory + ${CMAKE_CURRENT_SOURCE_DIR}/src/mscclpp + ${CMAKE_CURRENT_BINARY_DIR}/mscclpp) + +add_custom_command( + TARGET build-package POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy + $ + ${CMAKE_CURRENT_BINARY_DIR}/mscclpp/) + +add_custom_command( + TARGET build-package POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy + ${MSCCLPP_DIR}/lib/libmscclpp.so + ${CMAKE_CURRENT_BINARY_DIR}/mscclpp/) + diff --git a/python/Makefile b/python/Makefile new file mode 100644 index 00000000..aa7c222c --- /dev/null +++ b/python/Makefile @@ -0,0 +1,4 @@ + +test: + ./test.sh + diff --git a/python/README.md b/python/README.md new file mode 100644 index 00000000..04e90000 --- /dev/null +++ b/python/README.md @@ -0,0 +1,77 @@ +# Python bindings + +Test instructions: + * Compile the `libmscclpp.so` library. + * Install `cmake` verion >= 3.18 + * setup a python virtual env + * `pip install -r requirements.txt` + * `./tesh.sh` + +Rough build attemtps +``` +# cd to this directory: + +# setup/enter pyenv environment for python 3.9 + +# install nanabind and the test requirements. +pip install -r requirements.txt + +# setup and build the CMake environments. +# this requires nanobind, installed above. +./setup.sh + +# test the module +pytest build/mscclpp +``` + + +## Installing `gdrcopy` and `mpi` +This assumes that some things are built/installed +``` +# assumes WORKDIR has: +# git clone git@github.com/NVIDIA/gdrcopy.git +# git clone git@github.com:microsoft/mscclpp.git + +uname -r +# 5.4.0-1090-azure + +# install + +# break /usr/sbin/policy-rc.d so we can install modules +echo '#!/bin/sh +exit 0' > /usr/sbin/policy-rc.d + +apt update +apt install -y \ + build-essential devscripts debhelper check \ + libsubunit-dev fakeroot pkg-config dkms \ + linux-headers-5.4.0-1090-azure + +apt install -y nvidia-dkms-525-server + + +cd $WORKDIR/gdrcopy +sed -i 's/\(-L \$(CUDA)\/lib64\)/\1 \1\/stubs/' tests/Makefile +cd packages +CUDA=/usr/local/cuda ./build-deb-packages.sh + +dpkg -i gdrdrv-dkms_2.3-1_amd64.Ubuntu20_04.deb +dpkg -i libgdrapi_2.3-1_amd64.Ubuntu20_04.deb +dpkg -i gdrcopy-tests_2.3-1_amd64.Ubuntu20_04+cuda11.6.deb +dpkg -i gdrcopy_2.3-1_amd64.Ubuntu20_04.deb + +# validate: +# $ sanity +# Running suite(s): Sanity +# 100%: Checks: 27, Failures: 0, Errors: 0 + +# dkms install -m gdrdrv/2.3 + +cd $WORKDIR/mscclpp + +## numctl +apt install -y numactl libnuma-dev libnuma1 + +# if not mpi testing +USE_MPI_FOR_TESTS=0 make -j +``` diff --git a/python/ci.sh b/python/ci.sh new file mode 100755 index 00000000..f3ffb86a --- /dev/null +++ b/python/ci.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# CI hook script. + +set -ex + +# CD to this directory. +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd $SCRIPT_DIR + +# clean env +rm -rf .venv build + +# setup a python virtual env +python -m venv .venv + +# activate the virtual env +source .venv/bin/activate + +# install venv deps. +pip install -r requirements.txt + +# run the build and test. +./test.sh + diff --git a/python/format.sh b/python/format.sh new file mode 100755 index 00000000..c3b1dbac --- /dev/null +++ b/python/format.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +clang-format -style='{ + "BasedOnStyle": "google", + "BinPackParameters": false, + "BinPackArguments": false, + "AlignAfterOpenBracket": "AlwaysBreak" +}' -i src/*.cpp + diff --git a/python/requirements.txt b/python/requirements.txt new file mode 100644 index 00000000..eaf386fd --- /dev/null +++ b/python/requirements.txt @@ -0,0 +1,3 @@ +nanobind +pytest +PyHamcrest diff --git a/python/setup.sh b/python/setup.sh new file mode 100755 index 00000000..fe080bbf --- /dev/null +++ b/python/setup.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +set -ex +cmake -S . -B build +cmake --build build --clean-first -v + diff --git a/python/src/_py_mscclpp.cpp b/python/src/_py_mscclpp.cpp new file mode 100644 index 00000000..095ff2cf --- /dev/null +++ b/python/src/_py_mscclpp.cpp @@ -0,0 +1,262 @@ +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace nb = nanobind; +using namespace nb::literals; + +// This is a poorman's substitute for std::format, which is a C++20 feature. +template +std::string string_format(const std::string &format, Args... args) { +// Shutup format warning. +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wformat-security" + + // Dry-run to the get the buffer size: + // Extra space for '\0' + int size_s = std::snprintf(nullptr, 0, format.c_str(), args...) + 1; + if (size_s <= 0) { + throw std::runtime_error("Error during formatting."); + } + + // allocate buffer + auto size = static_cast(size_s); + std::unique_ptr buf(new char[size]); + + // actually format + std::snprintf(buf.get(), size, format.c_str(), args...); + + // Bulid the return string. + // We don't want the '\0' inside + return std::string(buf.get(), buf.get() + size - 1); + +#pragma GCC diagnostic pop +} + +// Maybe return the value, maybe throw an exception. +template +void checkResult( + mscclppResult_t status, const std::string &format, Args... args) { + switch (status) { + case mscclppSuccess: + return; + + case mscclppUnhandledCudaError: + case mscclppSystemError: + case mscclppInternalError: + case mscclppRemoteError: + case mscclppInProgress: + case mscclppNumResults: + throw std::runtime_error(string_format(format, args...)); + + case mscclppInvalidArgument: + case mscclppInvalidUsage: + default: + throw std::invalid_argument(string_format(format, args...)); + } +} + +// Maybe return the value, maybe throw an exception. +template +Val maybe( + mscclppResult_t status, Val val, const std::string &format, Args... args) { + checkResult(status, format, args...); + return val; +} + +// Wrapper around connection state. +struct MscclppComm { + mscclppComm_t _handle; + bool _is_open = false; + + public: + ~MscclppComm() { close(); } + + // Close should be safe to call on a closed handle. + void close() { + if (_is_open) { + checkResult(mscclppCommDestroy(_handle), "Failed to close comm channel"); + _handle = 0; + _is_open = false; + } + } + + void check_open() { + if (!_is_open) { + throw std::invalid_argument("MscclppComm is not open"); + } + } +}; + +static const std::string DOC_MscclppUniqueId = + "MSCCLPP Unique Id; used by the MPI Interface"; + +static const std::string DOC_MscclppComm = "MSCCLPP Communications Handle"; + + +NB_MODULE(_py_mscclpp, m) { + m.doc() = "Python bindings for MSCCLPP: which is not NCCL"; + + m.attr("MSCCLPP_UNIQUE_ID_BYTES") = MSCCLPP_UNIQUE_ID_BYTES; + + nb::class_(m, "MscclppUniqueId") + .def_ro_static("__doc__", &DOC_MscclppUniqueId) + .def_static( + "from_context", + []() { + mscclppUniqueId uniqueId; + return maybe( + mscclppGetUniqueId(&uniqueId), + uniqueId, + "Failed to get MSCCLP Unique Id."); + }, + nb::call_guard()) + .def_static( + "from_bytes", + [](nb::bytes source) { + if (source.size() != MSCCLPP_UNIQUE_ID_BYTES) { + throw std::invalid_argument(string_format( + "Requires exactly %d bytes; found %d", + MSCCLPP_UNIQUE_ID_BYTES, + source.size())); + } + + mscclppUniqueId uniqueId; + std::memcpy( + uniqueId.internal, source.c_str(), sizeof(uniqueId.internal)); + return uniqueId; + }) + .def("bytes", [](mscclppUniqueId id) { + return nb::bytes(id.internal, sizeof(id.internal)); + }); + + nb::class_(m, "MscclppComm") + .def_ro_static("__doc__", &DOC_MscclppComm) + .def_static( + "init_rank_from_address", + [](const std::string &address, int rank, int world_size) { + MscclppComm comm = {0}; + comm._is_open = true; + return maybe( + mscclppCommInitRank( + &comm._handle, world_size, address.c_str(), rank), + comm, + "Failed to initialize comms: %s rank=%d world_size=%d", + address, + rank, + world_size); + }, + nb::call_guard(), + "address"_a, + "rank"_a, + "world_size"_a, + "Initialize comms given an IP address, rank, and world_size") + .def_static( + "init_rank_from_id", + [](const mscclppUniqueId &id, int rank, int world_size) { + MscclppComm comm = {0}; + comm._is_open = true; + return maybe( + mscclppCommInitRankFromId(&comm._handle, world_size, id, rank), + comm, + "Failed to initialize comms: %02X%s rank=%d world_size=%d", + id.internal, + rank, + world_size); + }, + nb::call_guard(), + "id"_a, + "rank"_a, + "world_size"_a, + "Initialize comms given u UniqueID, rank, and world_size") + .def( + "opened", + [](MscclppComm &comm) { return comm._is_open; }, + "Is this comm object opened?") + .def( + "closed", + [](MscclppComm &comm) { return !comm._is_open; }, + "Is this comm object closed?") + .def( + "rank", + [](MscclppComm &comm) { + comm.check_open(); + int rank; + return maybe( + mscclppCommRank(comm._handle, &rank), + rank, + "Failed to retrieve MSCCLPP rank"); + }, + nb::call_guard(), + "The rank of this node.") + .def( + "size", + [](MscclppComm &comm) { + comm.check_open(); + int size; + return maybe( + mscclppCommSize(comm._handle, &size), + size, + "Failed to retrieve MSCCLPP world size"); + }, + nb::call_guard(), + "The world size of this node.") + .def( + "connection_setup", + [](MscclppComm &comm) { + comm.check_open(); + return maybe( + mscclppConnectionSetup(comm._handle), + true, + "Failed to settup MSCCLPP connection"); + }, + nb::call_guard(), + "Run connection setup for MSCCLPP.") + .def( + "launch_proxy", + [](MscclppComm &comm) { + comm.check_open(); + return maybe( + mscclppProxyLaunch(comm._handle), + true, + "Failed to launch MSCCLPP proxy"); + }, + nb::call_guard(), + "Start the MSCCLPP proxy.") + .def( + "stop_proxy", + [](MscclppComm &comm) { + comm.check_open(); + return maybe( + mscclppProxyStop(comm._handle), + true, + "Failed to stop MSCCLPP proxy"); + }, + nb::call_guard(), + "Start the MSCCLPP proxy.") + .def( + "close", + &MscclppComm::close, + nb::call_guard()) + .def( + "__del__", + &MscclppComm::close, + nb::call_guard()) + .def( + "bootstrap_all_gather", + [](MscclppComm &comm, void *data, int size) { + comm.check_open(); + return maybe( + mscclppBootstrapAllGather(comm._handle, data, size), + true, + "Failed to stop MSCCLPP proxy"); + }, + nb::call_guard()); + +} diff --git a/python/src/mscclpp/__init__.py b/python/src/mscclpp/__init__.py new file mode 100644 index 00000000..e825b92d --- /dev/null +++ b/python/src/mscclpp/__init__.py @@ -0,0 +1,13 @@ +from . import _py_mscclpp + +__all__ = ( + "MscclppUniqueId", + "MSCCLPP_UNIQUE_ID_BYTES", + "MscclppComm", +) + +MscclppUniqueId = _py_mscclpp.MscclppUniqueId +MSCCLPP_UNIQUE_ID_BYTES = _py_mscclpp.MSCCLPP_UNIQUE_ID_BYTES + +MscclppComm = _py_mscclpp.MscclppComm + diff --git a/python/src/mscclpp/test_mscclpp.py b/python/src/mscclpp/test_mscclpp.py new file mode 100644 index 00000000..a77707d8 --- /dev/null +++ b/python/src/mscclpp/test_mscclpp.py @@ -0,0 +1,79 @@ +import concurrent.futures +import unittest +import hamcrest + +import mscclpp + + +class UniqueIdTest(unittest.TestCase): + def test_no_constructor(self) -> None: + hamcrest.assert_that( + hamcrest.calling(mscclpp.MscclppUniqueId).with_args(), + hamcrest.raises( + TypeError, + "no constructor", + ), + ) + + def test_getUniqueId(self) -> None: + myId = mscclpp.MscclppUniqueId.from_context() + + hamcrest.assert_that( + myId.bytes(), + hamcrest.has_length(mscclpp.MSCCLPP_UNIQUE_ID_BYTES), + ) + + # from_bytes should work + copy = mscclpp.MscclppUniqueId.from_bytes(myId.bytes()) + hamcrest.assert_that( + copy.bytes(), + hamcrest.equal_to(myId.bytes()), + ) + + # bad size + hamcrest.assert_that( + hamcrest.calling(mscclpp.MscclppUniqueId.from_bytes).with_args(b'abc'), + hamcrest.raises( + ValueError, + f"Requires exactly {mscclpp.MSCCLPP_UNIQUE_ID_BYTES} bytes; found 3" + ), + ) + +def all_gather_task(rank: int, world_size: int) -> None: + comm_options = dict( + address="127.0.0.1:50000", + rank=rank, + world_size=world_size, + ) + print(f'{comm_options=}', flush=True) + + comm = mscclpp.MscclppComm.init_rank_from_address(**comm_options) + + buf = bytearray(world_size) + buf[rank] = rank + + if False: + # crashes, bad call structure.. + comm.bootstrap_all_gather(memoryview(buf), world_size) + hamcrest.assert_that( + buf, + hamcrest.equal_to(b'\000\002'), + ) + + comm.close() + + +class CommsTest(unittest.TestCase): + def test_all_gather(self) -> None: + world_size = 2 + + tasks: list[concurrent.futures.Future[None]] = [] + + with concurrent.futures.ProcessPoolExecutor(max_workers=world_size) as pool: + for rank in range(world_size): + tasks.append(pool.submit(all_gather_task, rank, world_size)) + + for f in concurrent.futures.as_completed(tasks): + f.result() + + diff --git a/python/test.sh b/python/test.sh new file mode 100755 index 00000000..32b19bda --- /dev/null +++ b/python/test.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -ex + +if ! [ -d build ] ; then + ./setup.sh +fi + +cmake --build build + +pytest build/mscclpp