Revert "Add transport"

This reverts commit 692e9acd8f.
This commit is contained in:
Saeed Maleki
2023-02-07 22:22:09 +00:00
parent 692e9acd8f
commit b9a253f82a
9 changed files with 29 additions and 1045 deletions

View File

@@ -98,7 +98,6 @@ BINDIR := bin
LIBSRCS := $(addprefix src/,debug.cc utils.cc param.cc)
LIBSRCS += $(addprefix src/bootstrap/,init.cc bootstrap.cc socket.cc proxy.cc)
LIBSRCS += $(addprefix src/p2p/,transport.cc)
LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS))
LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)

View File

@@ -7,11 +7,11 @@
#ifndef MSCCLPP_COMM_H_
#define MSCCLPP_COMM_H_
#include "transport.h"
// #include "transport.h"
// #include "p2p.h"
// #include "collectives.h"
#include "proxy.h"
#include "strongstream.h"
// #include "strongstream.h"
// #if CUDART_VERSION < 9000
// struct cudaLaunchParams {
@@ -96,18 +96,18 @@
// mscclppResult_t(*fn)(struct mscclppComm* comm, struct mscclppCommCallback* cb);
// };
struct mscclppChannel {
struct mscclppChannelPeer* peers;
struct mscclppDevChannelPeer* devPeers;
struct mscclppRing ring;
int* devRingUserRanks;
struct mscclppTree tree;
struct mscclppTree collnetChain;
struct mscclppDirect collnetDirect;
int id; // index of this channel
uint32_t workFifoSent; // last used work index+1
uint64_t p2pOpCount;
};
// struct mscclppChannel {
// struct mscclppChannelPeer* peers;
// struct mscclppDevChannelPeer* devPeers;
// struct mscclppRing ring;
// int* devRingUserRanks;
// struct mscclppTree tree;
// struct mscclppTree collnetChain;
// struct mscclppDirect collnetDirect;
// int id; // index of this channel
// uint32_t workFifoSent; // last used work index+1
// uint64_t p2pOpCount;
// };
// struct mscclppWorkList {
// struct mscclppWorkList* next;
@@ -160,16 +160,16 @@ struct mscclppComm {
// // List of destructors to run when comm is destructed
// struct mscclppDestructor* destructorHead;
struct mscclppChannel channels[MAXCHANNELS];
struct mscclppPeerInfo* peerInfo;
struct mscclppTopoSystem* topo;
// struct mscclppChannel channels[MAXCHANNELS];
// struct mscclppPeerInfo* peerInfo;
// struct mscclppTopoSystem* topo;
// mscclppNet_t* mscclppNet;
// mscclppCollNet_t* mscclppCollNet;
void* bootstrap;
// Bitmasks for mscclppTransportP2pSetup
uint64_t* connectSend;
uint64_t* connectRecv;
// // Bitmasks for mscclppTransportP2pSetup
// uint64_t* connectSend;
// uint64_t* connectRecv;
uint64_t magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches.
@@ -181,13 +181,13 @@ struct mscclppComm {
// cpu_set_t cpuAffinity; // CPU affinity of the GPU
// int node;
int nNodes;
int localRank;
int localRanks;
// int nNodes;
// int localRank;
// int localRanks;
// int maxLocalRanks;
// int* rankToNode;
// int* rankToLocalRank;
int* localRankToRank;
// int* localRankToRank;
// // localRanks and localRanktoRank for all nodes
// struct mscclppNodeRanks* nodeRanks;
@@ -199,8 +199,8 @@ struct mscclppComm {
// // Collective operation counter
// uint64_t collOpCount;
// Channels for collectives
int nChannels;
// // Channels for collectives
// int nChannels;
// // Channels (per peer) for p2p
// int p2pnChannels;
// int p2pnChannelsPerPeer;
@@ -259,8 +259,8 @@ struct mscclppComm {
// size_t channelSize; // User requested work size (bytes) for channel partitions
// Internal streams
struct mscclppStrongStream deviceStream, hostStream;
// // Internal streams
// struct mscclppStrongStream deviceStream, hostStream;
// // pools backed by comm->memPermanent
// struct mscclppMemoryPool memPool_mscclppProxyOp;

View File

@@ -1,291 +0,0 @@
/*************************************************************************
* Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_DEVICE_H_
#define MSCCLPP_DEVICE_H_
#include "mscclpp.h"
#include "align.h"
#include <stdint.h>
// #define MSCCLPP_NUM_FUNCTIONS 5 // Send/Recv not included for now
// typedef enum { mscclppFuncBroadcast, mscclppFuncReduce, mscclppFuncAllGather, mscclppFuncReduceScatter, mscclppFuncAllReduce, mscclppFuncSendRecv, mscclppFuncSend, mscclppFuncRecv, mscclppNumFuncs} mscclppFunc_t;
// extern const char* mscclppFuncStr[MSCCLPP_NUM_FUNCTIONS];
// #define MSCCLPP_NUM_ALGORITHMS 4 // Tree/Ring/CollNet*
// #define MSCCLPP_ALGO_TREE 0
// #define MSCCLPP_ALGO_RING 1
// #define MSCCLPP_ALGO_COLLNET_DIRECT 2
// #define MSCCLPP_ALGO_COLLNET_CHAIN 3
// extern const char* mscclppAlgoStr[MSCCLPP_NUM_ALGORITHMS];
#define MSCCLPP_NUM_PROTOCOLS 3 // Simple/LL/LL128
#define MSCCLPP_PROTO_LL 0
#define MSCCLPP_PROTO_LL128 1
#define MSCCLPP_PROTO_SIMPLE 2
extern const char* mscclppProtoStr[MSCCLPP_NUM_PROTOCOLS];
// #define MSCCLPP_MAX_OPS 2048
// #define MSCCLPP_STEPS 8
// union mscclppLLFifoLine {
// /* Flags have to be *after* data, because otherwise, an incomplete receive
// from the network may receive the flag but not the data.
// Note this is assuming that either we receive contiguous chunks of data
// (sockets) or data is written with an atomicity of 8 bytes (IB/RDMA). */
// struct {
// uint32_t data1;
// uint32_t flag1;
// uint32_t data2;
// uint32_t flag2;
// };
// uint64_t v[2];
// int4 i4;
// };
// #define WARP_SIZE 32
#define MAXCHANNELS 32
// #define MSCCLPP_MAX_NTHREADS 640
// #define MSCCLPP_SIMPLE_MAX_NTHREADS 512
// #define MSCCLPP_LL_MAX_NTHREADS 512
// #define MSCCLPP_LL_LINES_PER_THREAD 8
// #ifdef TEST_LL_CLEANUP
// #define MSCCLPP_LL_CLEAN_MASK 0x078 // Set to 0x100 to disable cleanup
// #define MSCCLPP_LL_FLAG_MAX 0x100
// #define MSCCLPP_LL_FLAG(a) ((uint32_t)((a) % MSCCLPP_LL_FLAG_MAX))
// #else
// #define MSCCLPP_LL_CLEAN_MASK 0x7ffffff8
// #define MSCCLPP_LL_FLAG(a) ((uint32_t)(a))
// #endif
// // Make sure the clean mask will last for at least MSCCLPP_NSTEPS
// static_assert(MSCCLPP_LL_CLEAN_MASK % MSCCLPP_STEPS == 0, "Invalid MSCCLPP_LL_CLEAN_MASK value");
// #define MSCCLPP_LL128_LINESIZE 128
// #define MSCCLPP_LL128_LINEELEMS (MSCCLPP_LL128_LINESIZE/sizeof(uint64_t))
// #define MSCCLPP_LL128_DATAELEMS (MSCCLPP_LL128_LINEELEMS-1)
// #define MSCCLPP_LL128_MAX_NTHREADS 640
// #define MSCCLPP_LL128_ELEMS_PER_THREAD 120
// #define MSCCLPP_LL128_SHMEM_ELEMS_PER_THREAD 8
// #define MSCCLPP_LL128_SHMEM_SIZE (MSCCLPP_LL128_SHMEM_ELEMS_PER_THREAD*MSCCLPP_LL128_MAX_NTHREADS)
// #define MSCCLPP_DIRECT_WRITE 0x01
// #define MSCCLPP_DIRECT_READ 0x02
// #define MSCCLPP_DIRECT_NIC 0x04
// #define MSCCLPP_IPC_WRITE 0x08
// #define MSCCLPP_IPC_READ 0x10
struct mscclppConnInfo {
// Regular comm mechanism
char *buffs[MSCCLPP_NUM_PROTOCOLS]; // Local for recv, remote for send
uint64_t *tail; // Local for recv, remote for send
uint64_t *head; // Local for send, remote for recv
int direct; // Direct communication
int shared; // Buffers are shared
void **ptrExchange; // Pointer exchange for direct communication
uint64_t* redOpArgExchange; // PreOp scaler exchange for direct pull case
int *sizesFifo; // Sizes fifo from GPU to proxy
int *offsFifo; // Buffer fifo from proxy to GPU
uint64_t step; // Keep where we are
uint64_t llLastCleaning;
};
struct mscclppProxyConnector {
int rank;
int localRank;
struct mscclppProxyConnection* connection;
struct mscclppComm* comm;
};
struct mscclppConnector {
int connected;
struct mscclppProxyConnector proxyConn;
struct mscclppTransportComm* transportComm;
void* transportResources;
struct mscclppConnInfo conn;
struct mscclppComm *comm;
};
struct mscclppRing {
// Shortcuts for userRanks[1] and userRanks[n-1]
int prev;
int next;
// Maps an internal mscclpp index to user-specified rank order. This is necessary
// since we need to know how the user expects data to be ordered across
// devices. Ordered from current device.
int* userRanks;
int index; // This rank's index in the ring
};
#define MSCCLPP_MAX_TREE_ARITY 3
struct mscclppTree {
int depth;
int up;
int down[MSCCLPP_MAX_TREE_ARITY];
};
#define MSCCLPP_MAX_DIRECT_ARITY 7
struct mscclppDirect {
int depth;
int out;
int nHeads;
int headRank;
int shift;
int up[MSCCLPP_MAX_DIRECT_ARITY];
int down[MSCCLPP_MAX_DIRECT_ARITY];
};
#define MSCCLPP_MAX_CONNS 2
struct mscclppChannelPeer {
struct mscclppConnector send[MSCCLPP_MAX_CONNS];
struct mscclppConnector recv[MSCCLPP_MAX_CONNS];
};
// struct mscclppDevComm;
// /* mscclppWork is to be a power of two, currently 8x64 bytes, */
// /* to make sure reads to host from the CUDA kernel are aligned. */
// /* Make sure to adjust padding at the end of mscclppWorkElem. */
// #define MSCCLPP_WORK_SIZE 512
// enum mscclppWorkType : uint8_t {
// mscclppWorkTypeUnused=0,
// mscclppWorkTypeColl=1,
// mscclppWorkTypeP2p=2,
// mscclppWorkTypeRegColl=3
// };
// enum mscclppWorkP2PType : uint8_t {
// mscclppWorkP2pTypeUnused=0,
// mscclppWorkP2pTypeSend,
// mscclppWorkP2pTypeRecv
// };
// struct mscclppWorkHeader {
// union {
// int32_t workNext; // when isLast=0: Offset from kernel argument workHead
// uint32_t doneAcks; // when isLast=1: Monotonic (mod 1<<32) ack value to send back.
// };
// uint16_t funcIndex;
// uint8_t isLast:1; // last work for this kernel
// uint8_t inFifo:1; // is this work in the fifo
// enum mscclppWorkType type;
// };
// struct mscclppWorkElem {
// union {
// uint8_t flagBits;
// struct {
// uint8_t isUsed:1, redOpArgIsPtr:1, regUsed:1;
// };
// };
// uint8_t nWarps;
// uint8_t direct;
// const void * sendbuff;
// void * recvbuff;
// size_t count;
// size_t lastChunkSize;
// uint32_t root;
// uint8_t bid;
// uint8_t nChannels;
// uint64_t redOpArg;
// };
// #define MSCCLPP_MAX_WORK_ELEMENTS ((MSCCLPP_WORK_SIZE - alignUp(sizeof(mscclppWorkHeader), alignof(mscclppWorkElem)))/sizeof(mscclppWorkElem))
// static_assert(MSCCLPP_MAX_WORK_ELEMENTS == 9, "Sanity check: MSCCLPP_MAX_WORK_ELEMENTS == 9");
// struct mscclppWorkElemP2p {
// int peer : 30;
// int proto : 2;
// enum mscclppWorkP2PType p2pType;
// uint8_t nWarps;
// uint8_t warpStart;
// uint8_t ngroups;
// // Important not to use any fields with greater than 4-byte alignment since
// // we need sizeof(mscclppWorkElemP2p)==28, but that would be padded up to 32 if
// // there were 8-byte fields.
// //void* buff;
// uint32_t buffHi32, buffLo32; // buff = buffHi32<<32 | buffLo32;
// //size_t count;
// uint32_t countHi32, countLo32; // count = countHi32<<32 | countLo32;
// int chunkSize;
// };
// static_assert(((MSCCLPP_WORK_SIZE - alignUp(sizeof(mscclppWorkHeader), alignof(mscclppWorkElemP2p)))/sizeof(mscclppWorkElemP2p)) >= 16, "Sanity check: MSCCLPP_MAX_WORK_ELEMENTS_P2P == 16");
// #define MSCCLPP_MAX_WORK_ELEMENTS_P2P 16
// struct mscclppWorkElemReg {
// struct mscclppWorkElem elem;
// void* dnInputs[MSCCLPP_MAX_DIRECT_ARITY+1];
// void* dnOutputs[MSCCLPP_MAX_DIRECT_ARITY+1];
// void* upOutputs[MSCCLPP_MAX_DIRECT_ARITY+1];
// };
// #define MSCCLPP_MAX_WORK_ELEMENTS_REG ((MSCCLPP_WORK_SIZE - alignUp(sizeof(mscclppWorkHeader), alignof(mscclppWorkElemReg)))/sizeof(mscclppWorkElemReg))
// static_assert(MSCCLPP_MAX_WORK_ELEMENTS_REG == 2, "Sanity check: MSCCLPP_MAX_WORK_ELEMENTS_REG == 2");
// // Number of named barriers supported by CUDA
// #define MSCCLPP_MAX_GROUPS 16
// struct mscclppWork {
// struct mscclppWorkHeader header;
// union {
// char pad[MSCCLPP_WORK_SIZE - sizeof(struct mscclppWorkHeader)];
// struct mscclppWorkElem elems[MSCCLPP_MAX_WORK_ELEMENTS];
// struct mscclppWorkElemP2p p2pElems[MSCCLPP_MAX_WORK_ELEMENTS_P2P];
// struct mscclppWorkElemReg regElems[MSCCLPP_MAX_WORK_ELEMENTS_REG];
// };
// };
// static_assert(sizeof(struct mscclppWork) == MSCCLPP_WORK_SIZE, "Sanity check: sizeof(struct mscclppWork) == MSCCLPP_WORK_SIZE");
// static_assert(sizeof(struct mscclppWork)%16 == 0, "Sanity check: sizeof(struct mscclppWork)%16 == 0");
struct mscclppDevChannelPeer {
// Stripped version of mscclppChannelPeer where we only keep the mscclppConnInfo
// instead of the full mscclppConnector.
struct mscclppConnInfo send[MSCCLPP_MAX_CONNS];
struct mscclppConnInfo recv[MSCCLPP_MAX_CONNS];
};
// struct alignas(16) mscclppDevChannel {
// struct mscclppDevChannelPeer *peers;
// struct mscclppRing ring;
// struct mscclppTree tree;
// struct mscclppTree collnetChain;
// struct mscclppDirect collnetDirect;
// uint32_t* workFifoDone; // Location of done counter, device writes index+1 of last work processed
// };
// struct mscclppDevComm {
// int rank;
// int nRanks;
// int buffSizes[MSCCLPP_NUM_PROTOCOLS];
// // Operation list for aggregation
// int workFifoDepth;
// struct mscclppWork* workFifoHeap; // may be cudaHost or GDR memory
// // Flag to ask MSCCLPP kernels to abort
// volatile uint32_t* abortFlag;
// // Channels, device side
// struct mscclppDevChannel* channels/*[MAXCHANNELS]*/;
// };
// struct alignas(16) mscclppDevCommAndChannels {
// struct mscclppDevComm comm;
// struct mscclppDevChannel channels[MAXCHANNELS];
// };
#endif

View File

@@ -1,115 +0,0 @@
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_GRAPH_H_
#define MSCCLPP_GRAPH_H_
#include "mscclpp.h"
#include "devcomm.h"
#include <limits.h>
#include <stdlib.h>
#include <ctype.h>
#include <stdio.h>
#include <sched.h>
// mscclppResult_t mscclppTopoCudaPath(int cudaDev, char** path);
// struct mscclppTopoSystem;
// // Build the topology
// mscclppResult_t mscclppTopoGetSystem(struct mscclppComm* comm, struct mscclppTopoSystem** system);
// mscclppResult_t mscclppTopoSortSystem(struct mscclppTopoSystem* system);
// mscclppResult_t mscclppTopoPrint(struct mscclppTopoSystem* system);
// mscclppResult_t mscclppTopoComputePaths(struct mscclppTopoSystem* system, struct mscclppComm* comm);
// void mscclppTopoFree(struct mscclppTopoSystem* system);
// mscclppResult_t mscclppTopoTrimSystem(struct mscclppTopoSystem* system, struct mscclppComm* comm);
// mscclppResult_t mscclppTopoComputeP2pChannels(struct mscclppComm* comm);
// mscclppResult_t mscclppTopoGetNvbGpus(struct mscclppTopoSystem* system, int rank, int* nranks, int** ranks);
// int mscclppTopoPathAllNVLink(struct mscclppTopoSystem* system);
// // Query topology
// mscclppResult_t mscclppTopoGetNetDev(struct mscclppComm* comm, int rank, struct mscclppTopoGraph* graph, int channelId, int peerRank, int* net, int* proxyRank);
// mscclppResult_t mscclppTopoCheckP2p(struct mscclppTopoSystem* system, int64_t id1, int64_t id2, int* p2p, int *read, int* intermediateRank);
// mscclppResult_t mscclppTopoCheckGdr(struct mscclppTopoSystem* topo, int64_t busId, int netDev, int read, int* useGdr);
// mscclppResult_t mscclppTopoNeedFlush(struct mscclppTopoSystem* system, int64_t busId, int* flush);
// mscclppResult_t mscclppTopoCheckNet(struct mscclppTopoSystem* system, int64_t id1, int64_t id2, int* net);
// int mscclppPxnDisable(struct mscclppComm* comm);
// mscclppResult_t mscclppTopoGetPxnRanks(struct mscclppComm* comm, int** intermediateRanks, int* nranks);
// mscclppResult_t mscclppTopoGetLocalRank(struct mscclppTopoSystem* system, int rank, int* localRank);
// // Find CPU affinity
// mscclppResult_t mscclppTopoGetCpuAffinity(struct mscclppTopoSystem* system, int rank, cpu_set_t* affinity);
// #define MSCCLPP_TOPO_CPU_ARCH_X86 1
// #define MSCCLPP_TOPO_CPU_ARCH_POWER 2
// #define MSCCLPP_TOPO_CPU_ARCH_ARM 3
// #define MSCCLPP_TOPO_CPU_VENDOR_INTEL 1
// #define MSCCLPP_TOPO_CPU_VENDOR_AMD 2
// #define MSCCLPP_TOPO_CPU_VENDOR_ZHAOXIN 3
// #define MSCCLPP_TOPO_CPU_TYPE_BDW 1
// #define MSCCLPP_TOPO_CPU_TYPE_SKL 2
// #define MSCCLPP_TOPO_CPU_TYPE_YONGFENG 1
// mscclppResult_t mscclppTopoCpuType(struct mscclppTopoSystem* system, int* arch, int* vendor, int* model);
// mscclppResult_t mscclppTopoGetNetCount(struct mscclppTopoSystem* system, int* count);
// mscclppResult_t mscclppTopoGetNvsCount(struct mscclppTopoSystem* system, int* count);
// mscclppResult_t mscclppTopoGetLocalNet(struct mscclppTopoSystem* system, int rank, int* id);
#define MSCCLPP_TOPO_MAX_NODES 256
// // Init search. Needs to be done before calling mscclppTopoCompute
// mscclppResult_t mscclppTopoSearchInit(struct mscclppTopoSystem* system);
// #define MSCCLPP_TOPO_PATTERN_BALANCED_TREE 1 // Spread NIC traffic between two GPUs (Tree parent + one child on first GPU, second child on second GPU)
// #define MSCCLPP_TOPO_PATTERN_SPLIT_TREE 2 // Spread NIC traffic between two GPUs (Tree parent on first GPU, tree children on the second GPU)
// #define MSCCLPP_TOPO_PATTERN_TREE 3 // All NIC traffic going to/from the same GPU
// #define MSCCLPP_TOPO_PATTERN_RING 4 // Ring
struct mscclppTopoGraph {
// Input / output
int id; // ring : 0, tree : 1, collnet : 2
int pattern;
int crossNic;
int collNet;
int minChannels;
int maxChannels;
// Output
int nChannels;
float bwIntra;
float bwInter;
float latencyInter;
int typeIntra;
int typeInter;
int sameChannels;
int nHops;
int intra[MAXCHANNELS*MSCCLPP_TOPO_MAX_NODES];
int inter[MAXCHANNELS*2];
};
// mscclppResult_t mscclppTopoCompute(struct mscclppTopoSystem* system, struct mscclppTopoGraph* graph);
// mscclppResult_t mscclppTopoPrintGraph(struct mscclppTopoSystem* system, struct mscclppTopoGraph* graph);
// mscclppResult_t mscclppTopoDumpGraphs(struct mscclppTopoSystem* system, int ngraphs, struct mscclppTopoGraph** graphs);
// struct mscclppTopoRanks {
// int ringRecv[MAXCHANNELS];
// int ringSend[MAXCHANNELS];
// int ringPrev[MAXCHANNELS];
// int ringNext[MAXCHANNELS];
// int treeToParent[MAXCHANNELS];
// int treeToChild0[MAXCHANNELS];
// int treeToChild1[MAXCHANNELS];
// };
// mscclppResult_t mscclppTopoPreset(struct mscclppComm* comm,
// struct mscclppTopoGraph* treeGraph, struct mscclppTopoGraph* ringGraph, struct mscclppTopoGraph* collNetGraph,
// struct mscclppTopoRanks* topoRanks);
// mscclppResult_t mscclppTopoPostset(struct mscclppComm* comm, int* firstRanks, int* treePatterns,
// struct mscclppTopoRanks** allTopoRanks, int* rings, struct mscclppTopoGraph* collNetGraph);
// mscclppResult_t mscclppTopoTuneModel(struct mscclppComm* comm, int minCompCap, int maxCompCap, struct mscclppTopoGraph* treeGraph, struct mscclppTopoGraph* ringGraph, struct mscclppTopoGraph* collNetGraph);
// #include "info.h"
// mscclppResult_t mscclppTopoGetAlgoTime(struct mscclppInfo* info, int algorithm, int protocol, int numPipeOps, float* time);
#endif

View File

@@ -1,119 +0,0 @@
/*************************************************************************
* Copyright (c) 2019-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_INFO_H_
#define MSCCLPP_INFO_H_
#include "mscclpp.h"
#include "devcomm.h"
// #include "collectives.h"
#include "core.h"
#include "utils.h"
#include "strongstream.h"
// typedef enum : uint8_t {
// mscclppPatternRing,
// mscclppPatternRingTwice,
// mscclppPatternPipelineFrom,
// mscclppPatternPipelineTo,
// mscclppPatternTreeUp,
// mscclppPatternTreeDown,
// mscclppPatternTreeUpDown,
// mscclppPatternCollnetChain,
// mscclppPatternCollnetDirect,
// mscclppPatternSend,
// mscclppPatternRecv
// } mscclppPattern_t;
// // Used to pass MSCCLPP call information between functions
// struct mscclppInfo {
// mscclppFunc_t coll;
// const char* opName;
// // MSCCLPP Coll Args
// const void* sendbuff;
// void* recvbuff;
// size_t count;
// mscclppDataType_t datatype;
// mscclppRedOp_t op;
// int root; // peer for p2p operations
// mscclppComm_t comm;
// cudaStream_t stream;
// // Algorithm details
// int chunkSteps;
// int sliceSteps;
// // Computed later
// mscclppDevRedOpFull opFull;
// int algorithm;
// int protocol;
// mscclppPattern_t pattern;
// int nChannels;
// int nThreads;
// size_t nBytes;
// int nstepsPerLoop;
// int nchunksPerLoop;
// int chunkSize;
// int channelId;
// };
// inline mscclppResult_t mscclppInfoSetDerived(struct mscclppInfo* info, int nRanks) {
// info->nBytes = info->count * mscclppTypeSize(info->datatype);
// if (info->coll == mscclppFuncAllGather || info->coll == mscclppFuncBroadcast) {
// info->count = info->nBytes;
// info->datatype = mscclppInt8;
// }
// if (info->coll == mscclppFuncAllGather || info->coll == mscclppFuncReduceScatter) info->nBytes *= nRanks; // count is per rank
// return mscclppSuccess;
// }
// struct mscclppTaskColl {
// struct mscclppTaskColl* next;
// mscclppFunc_t func;
// void const* sendbuff;
// void* recvbuff;
// size_t count;
// int root;
// mscclppDataType_t datatype;
// mscclppDevRedOpFull op;
// int chunkSteps, sliceSteps;
// };
// struct mscclppTaskP2p {
// mscclppTaskP2p *next;
// void *buff;
// size_t bytes;
// // Stateful chunk index. If a p2p gets "cut" over two plans this keeps track
// // of where it left off.
// int chunk;
// };
// struct mscclppCudaStreamList {
// struct mscclppCudaStreamList *next;
// cudaStream_t stream;
// };
// struct mscclppTasks {
// struct Peer {
// bool sendSeen, recvSeen;
// struct mscclppIntruQueue<struct mscclppTaskP2p, &mscclppTaskP2p::next> sendQueue;
// struct mscclppIntruQueue<struct mscclppTaskP2p, &mscclppTaskP2p::next> recvQueue;
// };
// struct mscclppIntruQueue<mscclppTaskColl, &mscclppTaskColl::next> collQueue;
// size_t collBytesTotal;
// struct Peer* peers/*[nRanks]*/;
// int *p2pSendOrder/*[nRanks]*/, *p2pRecvOrder/*[nRanks]*/;
// int nTasksColl, nTasksP2p;
// // The list of user streams aggregated over all tasks present.
// struct mscclppCudaStreamList* streams;
// // The most recent user stream. Ignored if streams==nullptr
// cudaStream_t streamRecent;
// // The graph capturing all user streams or invalid if none. Thus we restrict the
// // user that all streams must be captured in the same graph or not captured
// // at all. Technically we could probably relax this, but that would mean
// // collecting a different `mscclppTasks` per graph and one for non-graph.
// struct mscclppCudaGraph capturingGraph;
// };
#endif

View File

@@ -95,7 +95,7 @@
// // Make sure we have enough to store two full rounds of operations on all channels.
// // Otherwise we'd be unable to post half of them to free new elements.
// #define MAX_OPS_PER_PEER (2*MAXCHANNELS*MSCCLPP_MAX_WORK_ELEMENTS_P2P)
#define MSCCLPP_MAX_LOCAL_RANKS 64
// #define MSCCLPP_MAX_LOCAL_RANKS 64
// struct mscclppProxyOpsPool {
// struct mscclppProxyOp ops[MAX_OPS_PER_PEER*MSCCLPP_MAX_LOCAL_RANKS];
// volatile int nextOps;

View File

@@ -1,140 +0,0 @@
/*************************************************************************
* Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_STRONGSTREAM_H_
#define MSCCLPP_STRONGSTREAM_H_
#include "mscclpp.h"
#include "checks.h"
#include <stdint.h>
// /* mscclppCudaGraph: Wraps a cudaGraph_t so that we can support pre-graph CUDA runtimes
// * easily.
// */
// struct mscclppCudaGraph {
// #if CUDART_VERSION >= 11030
// cudaGraph_t graph;
// unsigned long long graphId;
// #endif
// };
// inline struct mscclppCudaGraph mscclppCudaGraphNone() {
// struct mscclppCudaGraph tmp;
// #if CUDART_VERSION >= 11030
// tmp.graph = nullptr;
// tmp.graphId = ULLONG_MAX;
// #endif
// return tmp;
// }
// inline bool mscclppCudaGraphValid(struct mscclppCudaGraph graph) {
// #if CUDART_VERSION >= 11030
// return graph.graph != nullptr;
// #else
// return false;
// #endif
// }
// inline bool mscclppCudaGraphSame(struct mscclppCudaGraph a, struct mscclppCudaGraph b) {
// #if CUDART_VERSION >= 11030
// return a.graphId == b.graphId;
// #else
// return true;
// #endif
// }
// mscclppResult_t mscclppCudaGetCapturingGraph(struct mscclppCudaGraph* graph, cudaStream_t stream);
// mscclppResult_t mscclppCudaGraphAddDestructor(struct mscclppCudaGraph graph, cudaHostFn_t fn, void* arg);
// /* mscclppStrongStream: An abstraction over CUDA streams that do not lose their
// * identity while being captured. Regular streams have the deficiency that the
// * captured form of a stream in one graph launch has no relation to the
// * uncaptured stream or to the captured form in other graph launches. This makes
// * streams unfit for the use of serializing access to a persistent resource.
// * Strong streams have been introduced to address this need.
// *
// * - All updates to a strong stream must be enclosed by a Acquire/Release pair.
// *
// * - The Acquire, Release, and all updates take a mscclppCudaGraph parameter
// * indicating the currently capturing graph (or none). This parameter must be
// * the same for the entire sequence of {Acquire; ...; Release}.
// *
// * - An {Acquire; ...; Release} sequence must not be concurrent with any
// * other operations against the strong stream including graph launches which
// * reference this stream.
// */
// struct mscclppStrongStream;
// mscclppResult_t mscclppStrongStreamConstruct(struct mscclppStrongStream* ss);
// mscclppResult_t mscclppStrongStreamDestruct(struct mscclppStrongStream* ss);
// // Acquire-fence the strong stream.
// mscclppResult_t mscclppStrongStreamAcquire(
// struct mscclppCudaGraph graph, struct mscclppStrongStream* ss
// );
// // Acquire-fence the strong stream assuming no graph is capturing. This permits
// // the caller to enqueue directly to the `ss->cudaStream` member using native CUDA
// // calls. Strong stream still must be released via:
// // mscclppStrongStreamRelease(mscclppCudaGraphNone(), ss);
// mscclppResult_t mscclppStrongStreamAcquireUncaptured(struct mscclppStrongStream* ss);
// // Release-fence of the strong stream.
// mscclppResult_t mscclppStrongStreamRelease(struct mscclppCudaGraph graph, struct mscclppStrongStream* ss);
// // Add a host launch to the stream.
// mscclppResult_t mscclppStrongStreamLaunchHost(
// struct mscclppCudaGraph graph, struct mscclppStrongStream* ss,
// cudaHostFn_t fn, void* arg
// );
// // Add a kernel launch to the stream.
// mscclppResult_t mscclppStrongStreamLaunchKernel(
// struct mscclppCudaGraph graph, struct mscclppStrongStream* ss,
// void* fn, dim3 grid, dim3 block, void** args, size_t sharedMemBytes
// );
// // Cause `a` to wait for the current state `b`. Both `a` and `b` must be acquired.
// // `b_subsumes_a` indicates that all work in `a` is already present in `b`, thus
// // we want to fast-forward `a` to be a clone of `b`. Knowing this permits the
// // implementation to induce few graph dependencies.
// mscclppResult_t mscclppStrongStreamWaitStream(
// struct mscclppCudaGraph graph, struct mscclppStrongStream* a, struct mscclppStrongStream* b, bool b_subsumes_a=false
// );
// // `b` must be capturing within `graph`.
// mscclppResult_t mscclppStrongStreamWaitStream(
// struct mscclppCudaGraph graph, struct mscclppStrongStream* a, cudaStream_t b, bool b_subsumes_a=false
// );
// // `a` must be capturing within `graph`.
// mscclppResult_t mscclppStrongStreamWaitStream(
// struct mscclppCudaGraph graph, cudaStream_t a, struct mscclppStrongStream* b, bool b_subsumes_a=false
// );
// // Synchrnoization does not need the strong stream to be acquired.
// mscclppResult_t mscclppStrongStreamSynchronize(struct mscclppStrongStream* ss);
// ////////////////////////////////////////////////////////////////////////////////
// struct mscclppStrongStreamGraph; // internal to mscclppStrongStream
struct mscclppStrongStream {
// Used when not graph capturing.
cudaStream_t cudaStream;
// #if CUDART_VERSION >= 11030
// // The event used to establish order between graphs and streams. During acquire
// // this event is waited on, during release it is recorded to.
// cudaEvent_t serialEvent;
// // This stream ever appeared in a graph capture.
// bool everCaptured;
// // Tracks whether serialEvent needs to be recorded to upon Release().
// bool serialEventNeedsRecord;
// struct mscclppStrongStreamGraph* graphHead;
// #else
// cudaEvent_t scratchEvent;
// #endif
};
#endif

View File

@@ -1,78 +0,0 @@
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_TRANSPORT_H_
#define MSCCLPP_TRANSPORT_H_
#include "devcomm.h"
#include "graph.h"
// #include "nvmlwrap.h"
#include "core.h"
#define NTRANSPORTS 4
#define TRANSPORT_P2P 0
#define TRANSPORT_SHM 1
#define TRANSPORT_NET 2
#define TRANSPORT_COLLNET 3
#include "proxy.h"
extern struct mscclppTransport p2pTransport;
extern struct mscclppTransport shmTransport;
extern struct mscclppTransport netTransport;
extern struct mscclppTransport collNetTransport;
extern struct mscclppTransport* mscclppTransports[];
// Forward declarations
struct mscclppRing;
struct mscclppConnector;
struct mscclppComm;
struct mscclppPeerInfo {
int rank;
int cudaDev;
int netDev;
int gdrSupport;
uint64_t hostHash;
uint64_t pidHash;
dev_t shmDev;
int64_t busId;
struct mscclppComm* comm;
int cudaCompCap;
};
#define CONNECT_SIZE 128
struct mscclppConnect {
char data[CONNECT_SIZE];
};
struct mscclppTransportComm {
mscclppResult_t (*setup)(struct mscclppComm* comm, struct mscclppTopoGraph* graph, struct mscclppPeerInfo*, struct mscclppPeerInfo*, struct mscclppConnect*, struct mscclppConnector*, int channelId, int connIndex);
mscclppResult_t (*connect)(struct mscclppComm* comm, struct mscclppConnect*, int nranks, int rank, struct mscclppConnector*);
mscclppResult_t (*free)(struct mscclppConnector*);
mscclppResult_t (*proxySharedInit)(struct mscclppProxyConnection* connection, struct mscclppComm* comm, int nChannels);
mscclppResult_t (*proxySetup)(struct mscclppProxyConnection* connection, struct mscclppComm* comm, void* reqBuff, int reqSize, void* respBuff, int respSize, int* done);
mscclppResult_t (*proxyConnect)(struct mscclppProxyConnection* connection, struct mscclppComm* comm, void* reqBuff, int reqSize, void* respBuff, int respSize, int* done);
mscclppResult_t (*proxyFree)(struct mscclppProxyConnection* connection, struct mscclppComm* comm);
mscclppResult_t (*proxyProgress)(struct mscclppComm* comm, struct mscclppProxyArgs*);
};
struct mscclppTransport {
const char name[4];
mscclppResult_t (*canConnect)(int*, struct mscclppTopoSystem* topo, struct mscclppTopoGraph* graph, struct mscclppPeerInfo*, struct mscclppPeerInfo*);
struct mscclppTransportComm send;
struct mscclppTransportComm recv;
};
mscclppResult_t mscclppTransportP2pConnect(struct mscclppComm* comm, int channelId, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex);
mscclppResult_t mscclppTransportP2pSetup(struct mscclppComm* comm, struct mscclppTopoGraph* graph, int connIndex, int* highestTransportType=NULL);
enum { collNetRecv=0, collNetSend=1 };
int mscclppTransportCollNetSetup(struct mscclppComm* comm, struct mscclppTopoGraph* collNetGraph, struct mscclppChannel* channel, int masterRank, int masterPeer, int collNetGraphChannelId, int type);
mscclppResult_t mscclppTransportCollNetCheck(struct mscclppComm* comm, int collNetSetupFail);
mscclppResult_t mscclppTransportCollNetFree(struct mscclppComm* comm);
#endif

View File

@@ -1,272 +0,0 @@
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "comm.h"
#include "info.h"
#include "bootstrap.h"
// #define ENABLE_TIMER 0
// #include "timer.h"
struct mscclppTransport* mscclppTransports[NTRANSPORTS] = {
&p2pTransport,
&shmTransport,
&netTransport,
&collNetTransport
};
template <int type>
static mscclppResult_t selectTransport(struct mscclppComm* comm, struct mscclppTopoGraph* graph, struct mscclppConnect* connect, int channelId, int peer, int connIndex, int* transportType) {
struct mscclppPeerInfo* myInfo = comm->peerInfo+comm->rank;
struct mscclppPeerInfo* peerInfo = comm->peerInfo+peer;
struct mscclppConnector* connector = (type == 1) ? comm->channels[channelId].peers[peer].send + connIndex :
comm->channels[channelId].peers[peer].recv + connIndex;
for (int t=0; t<NTRANSPORTS; t++) {
struct mscclppTransport *transport = mscclppTransports[t];
struct mscclppTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv;
int ret = 0;
MSCCLPPCHECK(transport->canConnect(&ret, comm->topo, graph, myInfo, peerInfo));
if (ret) {
connector->transportComm = transportComm;
MSCCLPPCHECK(transportComm->setup(comm, graph, myInfo, peerInfo, connect, connector, channelId, connIndex));
if (transportType) *transportType = t;
return mscclppSuccess;
}
}
WARN("No transport found for rank %d[%lx] -> rank %d[%lx]", myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId);
return mscclppSystemError;
}
mscclppResult_t mscclppTransportP2pConnect(struct mscclppComm* comm, int channelId, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex) {
TRACE(MSCCLPP_INIT, "nsend %d nrecv %d", nsend, nrecv);
struct mscclppChannel* channel = &comm->channels[channelId];
uint64_t mask = 1UL << channel->id;
for (int i=0; i<nrecv; i++) {
int peer = peerRecv[i];
if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer].recv[connIndex].connected) continue;
comm->connectRecv[peer] |= mask;
}
for (int i=0; i<nsend; i++) {
int peer = peerSend[i];
if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer].send[connIndex].connected) continue;
comm->connectSend[peer] |= mask;
}
return mscclppSuccess;
}
void dumpData(struct mscclppConnect* data, int ndata) {
for (int n=0; n<ndata; n++) {
printf("[%d] ", n);
uint8_t* d = (uint8_t*)data;
for (int i=0; i<sizeof(struct mscclppConnect); i++) printf("%02x", d[i]);
printf("\n");
}
}
mscclppResult_t mscclppTransportP2pSetup(struct mscclppComm* comm, struct mscclppTopoGraph* graph, int connIndex, int* highestTransportType/*=NULL*/) {
// Stream used during transport setup; need for P2P pre-connect + CUDA Graph
mscclppResult_t ret = mscclppSuccess;
int highestType = TRANSPORT_P2P; // track highest transport type
struct mscclppConnect data[2*MAXCHANNELS];
// MSCCLPPCHECKGOTO(mscclppStrongStreamAcquireUncaptured(&comm->hostStream), ret, fail);
for (int i=1; i<comm->nRanks; i++) {
int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0);
int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + i) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer];
uint64_t sendMask = comm->connectSend[sendPeer];
struct mscclppConnect* recvData = data;
int sendChannels = 0, recvChannels = 0;
int type;
// TIME_START(0);
for (int c=0; c<MAXCHANNELS; c++) {
if (recvMask & (1UL<<c)) {
MSCCLPPCHECKGOTO(selectTransport<0>(comm, graph, recvData+recvChannels++, c, recvPeer, connIndex, &type), ret, fail);
if (type > highestType) highestType = type;
}
}
// TIME_STOP(0);
// TIME_START(1);
struct mscclppConnect* sendData = recvData+recvChannels;
for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1UL<<c)) {
MSCCLPPCHECKGOTO(selectTransport<1>(comm, graph, sendData+sendChannels++, c, sendPeer, connIndex, &type), ret, fail);
if (type > highestType) highestType = type;
}
}
// TIME_STOP(1);
// TIME_START(2);
if (sendPeer == recvPeer) {
if (recvChannels+sendChannels) {
MSCCLPPCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data, sizeof(struct mscclppConnect)*(recvChannels+sendChannels)), ret, fail);
MSCCLPPCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data, sizeof(struct mscclppConnect)*(recvChannels+sendChannels)), ret, fail);
sendData = data;
recvData = data+sendChannels;
}
} else {
if (recvChannels) MSCCLPPCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData, sizeof(struct mscclppConnect)*recvChannels), ret, fail);
if (sendChannels) MSCCLPPCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData, sizeof(struct mscclppConnect)*sendChannels), ret, fail);
if (sendChannels) MSCCLPPCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData, sizeof(struct mscclppConnect)*sendChannels), ret, fail);
if (recvChannels) MSCCLPPCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData, sizeof(struct mscclppConnect)*recvChannels), ret, fail);
}
// TIME_STOP(2);
// TIME_START(3);
for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1UL<<c)) {
struct mscclppConnector* conn = comm->channels[c].peers[sendPeer].send + connIndex;
MSCCLPPCHECKGOTO(conn->transportComm->connect(comm, sendData++, 1, comm->rank, conn), ret, fail);
conn->connected = 1;
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeers[sendPeer].send[connIndex], &conn->conn, sizeof(struct mscclppConnInfo), cudaMemcpyHostToDevice, comm->hostStream.cudaStream), ret, fail);
}
}
// TIME_STOP(3);
// TIME_START(4);
for (int c=0; c<MAXCHANNELS; c++) {
if (recvMask & (1UL<<c)) {
struct mscclppConnector* conn = comm->channels[c].peers[recvPeer].recv + connIndex;
MSCCLPPCHECKGOTO(conn->transportComm->connect(comm, recvData++, 1, comm->rank, conn), ret, fail);
conn->connected = 1;
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeers[recvPeer].recv[connIndex], &conn->conn, sizeof(struct mscclppConnInfo), cudaMemcpyHostToDevice, comm->hostStream.cudaStream), ret, fail);
}
}
// TIME_STOP(4);
comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0UL;
}
if (highestTransportType != NULL) *highestTransportType = highestType;
// TIME_PRINT("P2P Setup/Connect");
exit:
// MSCCLPPCHECK(mscclppStrongStreamWaitStream(mscclppCudaGraphNone(), &comm->deviceStream, &comm->hostStream));
// MSCCLPPCHECK(mscclppStrongStreamRelease(mscclppCudaGraphNone(), &comm->hostStream));
return ret;
fail:
goto exit;
}
extern struct mscclppTransport collNetTransport;
// All ranks must participate in collNetSetup call
// We do not MSCCLPPCHECK this call because we would fall back to P2P network in case CollNet setup fails
int mscclppTransportCollNetSetup(struct mscclppComm* comm, struct mscclppTopoGraph* collNetGraph, struct mscclppChannel* channel, int masterRank, int masterPeer, int collNetGraphChannelId, int type) {
int fail = 1;
int rank = comm->rank;
int nranks = comm->nRanks;
int nMasters = comm->nNodes;
int rankInCollNet = -1;
int isMaster = (rank == masterRank) ? 1 : 0;
struct {
int collNetRank;
mscclppConnect connect;
} sendrecvExchange;
// check if we can connect to collnet, whose root is the nranks-th rank
struct mscclppPeerInfo *myInfo = comm->peerInfo+rank, *peerInfo = comm->peerInfo+nranks;
peerInfo->rank = nranks;
// send master receives connect info from peer recv master
if (isMaster && type == collNetSend) {
MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, masterPeer, collNetGraph->id, &sendrecvExchange, sizeof(sendrecvExchange)));
rankInCollNet = sendrecvExchange.collNetRank;
TRACE(MSCCLPP_INIT, "CollNet [send] : rank %d collNetRank %d collNetNranks %d received connect from rank %d", rank, rankInCollNet, nMasters, masterPeer);
}
// select
struct mscclppChannelPeer* root = channel->peers+nranks;
// connector index: 0 for recv, 1 for send
struct mscclppConnector* conn = (type == collNetRecv) ? root->recv+type : root->send+type;
struct mscclppTransportComm* transportComm = (type == collNetRecv) ? &(collNetTransport.recv) : &(collNetTransport.send);
conn->transportComm = transportComm;
// setup
struct mscclppConnect myConnect;
if (isMaster) {
MSCCLPPCHECK(transportComm->setup(comm, collNetGraph, myInfo, peerInfo, &myConnect, conn, collNetGraphChannelId, type));
}
// prepare connect handles
mscclppResult_t res;
struct {
int isMaster;
mscclppConnect connect;
} *allConnects = NULL;
mscclppConnect *masterConnects = NULL;
MSCCLPPCHECK(mscclppCalloc(&masterConnects, nMasters));
if (type == collNetRecv) { // recv side: AllGather
// all ranks must participate
MSCCLPPCHECK(mscclppCalloc(&allConnects, nranks));
allConnects[rank].isMaster = isMaster;
memcpy(&(allConnects[rank].connect), &myConnect, sizeof(struct mscclppConnect));
MSCCLPPCHECKGOTO(bootstrapAllGather(comm->bootstrap, allConnects, sizeof(*allConnects)), res, cleanup);
// consolidate
int c = 0;
for (int r = 0; r < nranks; r++) {
if (allConnects[r].isMaster) {
memcpy(masterConnects+c, &(allConnects[r].connect), sizeof(struct mscclppConnect));
if (r == rank) rankInCollNet = c;
c++;
}
}
} else { // send side : copy in connect info received from peer recv master
if (isMaster) memcpy(masterConnects+rankInCollNet, &(sendrecvExchange.connect), sizeof(struct mscclppConnect));
}
// connect
if (isMaster) {
MSCCLPPCHECKGOTO(transportComm->connect(comm, masterConnects, nMasters, rankInCollNet, conn), res, cleanup);
struct mscclppDevChannelPeer* devRoot = channel->devPeers+nranks;
struct mscclppConnInfo* devConnInfo = (type == collNetRecv) ? devRoot->recv+type : devRoot->send+type;
CUDACHECKGOTO(cudaMemcpy(devConnInfo, &conn->conn, sizeof(struct mscclppConnInfo), cudaMemcpyHostToDevice), res, cleanup);
}
// recv side sends connect info to send side
if (isMaster && type == collNetRecv) {
sendrecvExchange.collNetRank = rankInCollNet;
memcpy(&sendrecvExchange.connect, masterConnects+rankInCollNet, sizeof(struct mscclppConnect));
MSCCLPPCHECKGOTO(bootstrapSend(comm->bootstrap, masterPeer, collNetGraph->id, &sendrecvExchange, sizeof(sendrecvExchange)), res, cleanup);
TRACE(MSCCLPP_INIT, "CollNet [recv] : rank %d collNetRank %d collNetNranks %d sent connect to rank %d", rank, rankInCollNet, nMasters, masterPeer);
}
fail = 0;
cleanup:
if (allConnects != NULL) free(allConnects);
if (masterConnects != NULL) free(masterConnects);
return fail;
}
mscclppResult_t mscclppTransportCollNetCheck(struct mscclppComm* comm, int collNetSetupFail) {
// AllGather collNet setup results
int allGatherFailures[MSCCLPP_MAX_LOCAL_RANKS] = {0};
allGatherFailures[comm->localRank] = collNetSetupFail;
MSCCLPPCHECK(bootstrapIntraNodeAllGather(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, allGatherFailures, sizeof(int)));
for (int i=0; i<comm->localRanks; i++) {
if (allGatherFailures[i] != 0) {
collNetSetupFail = 1;
break;
}
}
if (collNetSetupFail) {
if (comm->localRank == 0) WARN("Cannot initialize CollNet, using point-to-point network instead");
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t mscclppTransportCollNetFree(struct mscclppComm* comm) {
// Free collNet resources
for (int r=0; r<comm->nChannels; r++) {
struct mscclppChannel* channel = comm->channels+r;
struct mscclppChannelPeer* peer = channel->peers+comm->nRanks;
for (int b=0; b<MSCCLPP_MAX_CONNS; b++) {
struct mscclppConnector* send = peer->send + b;
if (send->transportResources && send->transportComm) MSCCLPPCHECK(send->transportComm->free(send));
send->transportResources = NULL; // avoid double free
}
for (int b=0; b<MSCCLPP_MAX_CONNS; b++) {
struct mscclppConnector* recv = peer->recv + b;
if (recv->transportResources && recv->transportComm) MSCCLPPCHECK(recv->transportComm->free(recv));
recv->transportResources = NULL; // avoid double free
}
}
return mscclppSuccess;
}