ep tests: clean shutdown to silence TCPStore/HeartbeatMonitor noise

Add dist.barrier() + dist.destroy_process_group() in a finally block so
non-zero ranks don't poll the TCPStore after rank 0 (the store server)
exits, which produced noisy 'recvValue failed / Connection was likely
closed' stack traces from ProcessGroupNCCL's HeartbeatMonitor.

Also pass device_id to init_process_group in the internode test to
silence 'Guessing device ID based on global rank' warnings.
This commit is contained in:
Qinghua Zhou
2026-04-29 05:16:22 +00:00
parent 0227626335
commit afbdcd6a3d
3 changed files with 37 additions and 5 deletions

View File

@@ -38,7 +38,8 @@ def init_dist():
world_size = int(os.environ["WORLD_SIZE"])
local_rank = int(os.environ.get("LOCAL_RANK", rank % 8))
torch.cuda.set_device(local_rank)
dist.init_process_group(backend="nccl", world_size=world_size, rank=rank)
dist.init_process_group(backend="nccl", world_size=world_size, rank=rank,
device_id=torch.device(f"cuda:{local_rank}"))
return rank, world_size, local_rank, dist.new_group(list(range(world_size)))
@@ -381,3 +382,14 @@ if __name__ == "__main__":
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# Ordered shutdown: barrier so every rank reaches teardown before the
# TCPStore server (rank 0) exits, then destroy the PG. Without this,
# ProcessGroupNCCL's HeartbeatMonitor on non-zero ranks logs noisy
# "recvValue failed / Connection was likely closed" stack traces.
if dist.is_initialized():
try:
dist.barrier()
except Exception:
pass
dist.destroy_process_group()

View File

@@ -327,3 +327,14 @@ if __name__ == "__main__":
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# Ordered shutdown: barrier so every rank reaches teardown before the
# TCPStore server (rank 0) exits, then destroy the PG. Avoids noisy
# "recvValue failed / Connection was likely closed" stack traces from
# ProcessGroupNCCL's HeartbeatMonitor.
if dist.is_initialized():
try:
dist.barrier()
except Exception:
pass
dist.destroy_process_group()

View File

@@ -308,7 +308,16 @@ if __name__ == "__main__":
try:
main()
finally:
try:
dist.destroy_process_group()
except Exception:
pass
# Ordered shutdown: barrier so every rank reaches teardown before the
# TCPStore server (rank 0) exits, then destroy the PG. Avoids noisy
# "recvValue failed / Connection was likely closed" stack traces from
# ProcessGroupNCCL's HeartbeatMonitor.
if dist.is_initialized():
try:
dist.barrier()
except Exception:
pass
try:
dist.destroy_process_group()
except Exception:
pass