Files
ComfyUI/comfy/isolation/rpc_bridge.py
John Pollock c5e7b9cdaf feat(isolation): process isolation for custom nodes via pyisolate
Adds opt-in process isolation for custom nodes using pyisolate's
bwrap sandbox and JSON-RPC bridge. Each isolated node pack runs in
its own child process with zero-copy tensor transfer via shared memory.

Core infrastructure:
- CLI flag --use-process-isolation to enable isolation
- Host/child startup fencing via PYISOLATE_CHILD env var
- Manifest-driven node discovery and extension loading
- JSON-RPC bridge between host and child processes
- Shared memory forensics for leak detection

Proxy layer:
- ModelPatcher, CLIP, VAE, and ModelSampling proxies
- Host service proxies (folder_paths, model_management, progress, etc.)
- Proxy base with automatic method forwarding

Execution integration:
- Extension wrapper with V3 hidden param mapping
- Runtime helpers for isolated node execution
- Host policy for node isolation decisions
- Fenced sampler device handling and model ejection parity

Serializers for cross-process data transfer:
- File3D (GLB), PLY (structured + gaussian), NPZ (streaming frames),
  VIDEO (VideoFromFile + VideoFromComponents) serializers
- data_type flag in SerializerRegistry for type-aware dispatch
- Isolated get_temp_directory() fence

New core save nodes:
- SavePLY and SaveNPZ with comfytype registrations (Ply, Npz)

DynamicVRAM compatibility:
- comfy-aimdo early init gated by isolation fence

Tests:
- Integration and policy tests for isolation lifecycle
- Manifest loader, host policy, proxy, and adapter unit tests

Depends on: pyisolate >= 0.9.2
2026-03-12 01:13:43 -05:00

50 lines
1.4 KiB
Python

import asyncio
import logging
import threading
logger = logging.getLogger(__name__)
class RpcBridge:
"""Minimal helper to run coroutines synchronously inside isolated processes.
If an event loop is already running, the coroutine is executed on a fresh
thread with its own loop to avoid nested run_until_complete errors.
"""
def run_sync(self, maybe_coro):
if not asyncio.iscoroutine(maybe_coro):
return maybe_coro
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
result_container = {}
exc_container = {}
def _runner():
try:
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
result_container["value"] = new_loop.run_until_complete(maybe_coro)
except Exception as exc: # pragma: no cover
exc_container["error"] = exc
finally:
try:
new_loop.close()
except Exception:
pass
t = threading.Thread(target=_runner, daemon=True)
t.start()
t.join()
if "error" in exc_container:
raise exc_container["error"]
return result_container.get("value")
return asyncio.run(maybe_coro)