feat(isolation): process isolation for custom nodes via pyisolate

Adds opt-in process isolation for custom nodes using pyisolate's
bwrap sandbox and JSON-RPC bridge. Each isolated node pack runs in
its own child process with zero-copy tensor transfer via shared memory.

Core infrastructure:
- CLI flag --use-process-isolation to enable isolation
- Host/child startup fencing via PYISOLATE_CHILD env var
- Manifest-driven node discovery and extension loading
- JSON-RPC bridge between host and child processes
- Shared memory forensics for leak detection

Proxy layer:
- ModelPatcher, CLIP, VAE, and ModelSampling proxies
- Host service proxies (folder_paths, model_management, progress, etc.)
- Proxy base with automatic method forwarding

Execution integration:
- Extension wrapper with V3 hidden param mapping
- Runtime helpers for isolated node execution
- Host policy for node isolation decisions
- Fenced sampler device handling and model ejection parity

Serializers for cross-process data transfer:
- File3D (GLB), PLY (structured + gaussian), NPZ (streaming frames),
  VIDEO (VideoFromFile + VideoFromComponents) serializers
- data_type flag in SerializerRegistry for type-aware dispatch
- Isolated get_temp_directory() fence

New core save nodes:
- SavePLY and SaveNPZ with comfytype registrations (Ply, Npz)

DynamicVRAM compatibility:
- comfy-aimdo early init gated by isolation fence

Tests:
- Integration and policy tests for isolation lifecycle
- Manifest loader, host policy, proxy, and adapter unit tests

Depends on: pyisolate >= 0.9.2
This commit is contained in:
John Pollock
2026-03-12 01:13:43 -05:00
parent 9ce4c3dd87
commit c5e7b9cdaf
54 changed files with 9061 additions and 78 deletions

View File

@@ -1,7 +1,9 @@
import copy
import gc
import heapq
import inspect
import logging
import os
import sys
import threading
import time
@@ -41,6 +43,8 @@ from comfy_execution.utils import CurrentNodeContext
from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_real_override, is_class, make_locked_method_func
from comfy_api.latest import io, _io
_AIMDO_VBAR_RESET_UNAVAILABLE_LOGGED = False
class ExecutionResult(Enum):
SUCCESS = 0
@@ -261,20 +265,31 @@ async def _async_map_node_over_list(prompt_id, unique_id, obj, input_data_all, f
pre_execute_cb(index)
# V3
if isinstance(obj, _ComfyNodeInternal) or (is_class(obj) and issubclass(obj, _ComfyNodeInternal)):
# if is just a class, then assign no state, just create clone
if is_class(obj):
type_obj = obj
obj.VALIDATE_CLASS()
class_clone = obj.PREPARE_CLASS_CLONE(v3_data)
# otherwise, use class instance to populate/reuse some fields
# Check for isolated node - skip validation and class cloning
if hasattr(obj, "_pyisolate_extension"):
# Isolated Node: The stub is just a proxy; real validation happens in child process
if v3_data is not None:
inputs = _io.build_nested_inputs(inputs, v3_data)
# Inject hidden inputs so they're available in the isolated child process
inputs.update(v3_data.get("hidden_inputs", {}))
f = getattr(obj, func)
# Standard V3 Node (Existing Logic)
else:
type_obj = type(obj)
type_obj.VALIDATE_CLASS()
class_clone = type_obj.PREPARE_CLASS_CLONE(v3_data)
f = make_locked_method_func(type_obj, func, class_clone)
# in case of dynamic inputs, restructure inputs to expected nested dict
if v3_data is not None:
inputs = _io.build_nested_inputs(inputs, v3_data)
# if is just a class, then assign no resources or state, just create clone
if is_class(obj):
type_obj = obj
obj.VALIDATE_CLASS()
class_clone = obj.PREPARE_CLASS_CLONE(v3_data)
# otherwise, use class instance to populate/reuse some fields
else:
type_obj = type(obj)
type_obj.VALIDATE_CLASS()
class_clone = type_obj.PREPARE_CLASS_CLONE(v3_data)
f = make_locked_method_func(type_obj, func, class_clone)
# in case of dynamic inputs, restructure inputs to expected nested dict
if v3_data is not None:
inputs = _io.build_nested_inputs(inputs, v3_data)
# V1
else:
f = getattr(obj, func)
@@ -527,7 +542,17 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed,
if args.verbose == "DEBUG":
comfy_aimdo.control.analyze()
comfy.model_management.reset_cast_buffers()
comfy_aimdo.model_vbar.vbars_reset_watermark_limits()
vbar_lib = getattr(comfy_aimdo.model_vbar, "lib", None)
if vbar_lib is not None:
comfy_aimdo.model_vbar.vbars_reset_watermark_limits()
else:
global _AIMDO_VBAR_RESET_UNAVAILABLE_LOGGED
if not _AIMDO_VBAR_RESET_UNAVAILABLE_LOGGED:
logging.warning(
"DynamicVRAM backend unavailable for watermark reset; "
"skipping vbar reset for this process."
)
_AIMDO_VBAR_RESET_UNAVAILABLE_LOGGED = True
if has_pending_tasks:
pending_async_nodes[unique_id] = output_data
@@ -536,6 +561,14 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed,
tasks = [x for x in output_data if isinstance(x, asyncio.Task)]
await asyncio.gather(*tasks, return_exceptions=True)
unblock()
# Keep isolation node execution deterministic by default, but allow
# opt-out for diagnostics.
isolation_sequential = os.environ.get("COMFY_ISOLATE_SEQUENTIAL", "1").lower() in ("1", "true", "yes")
if args.use_process_isolation and isolation_sequential:
await await_completion()
return await execute(server, dynprompt, caches, current_item, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_outputs)
asyncio.create_task(await_completion())
return (ExecutionResult.PENDING, None, None)
if len(output_ui) > 0:
@@ -647,6 +680,46 @@ class PromptExecutor:
self.status_messages = []
self.success = True
async def _notify_execution_graph_safe(self, class_types: set[str], *, fail_loud: bool = False) -> None:
if not args.use_process_isolation:
return
try:
from comfy.isolation import notify_execution_graph
await notify_execution_graph(class_types)
except Exception:
if fail_loud:
raise
logging.debug("][ EX:notify_execution_graph failed", exc_info=True)
async def _flush_running_extensions_transport_state_safe(self) -> None:
if not args.use_process_isolation:
return
try:
from comfy.isolation import flush_running_extensions_transport_state
await flush_running_extensions_transport_state()
except Exception:
logging.debug("][ EX:flush_running_extensions_transport_state failed", exc_info=True)
async def _wait_model_patcher_quiescence_safe(
self,
*,
fail_loud: bool = False,
timeout_ms: int = 120000,
marker: str = "EX:wait_model_patcher_idle",
) -> None:
if not args.use_process_isolation:
return
try:
from comfy.isolation import wait_for_model_patcher_quiescence
await wait_for_model_patcher_quiescence(
timeout_ms=timeout_ms, fail_loud=fail_loud, marker=marker
)
except Exception:
if fail_loud:
raise
logging.debug("][ EX:wait_model_patcher_quiescence failed", exc_info=True)
def add_message(self, event, data: dict, broadcast: bool):
data = {
**data,
@@ -688,6 +761,18 @@ class PromptExecutor:
asyncio.run(self.execute_async(prompt, prompt_id, extra_data, execute_outputs))
async def execute_async(self, prompt, prompt_id, extra_data={}, execute_outputs=[]):
if args.use_process_isolation:
# Update RPC event loops for all isolated extensions.
# This is critical for serial workflow execution - each asyncio.run() creates
# a new event loop, and RPC instances must be updated to use it.
try:
from comfy.isolation import update_rpc_event_loops
update_rpc_event_loops()
except ImportError:
pass # Isolation not available
except Exception as e:
logging.getLogger(__name__).warning(f"Failed to update RPC event loops: {e}")
set_preview_method(extra_data.get("preview_method"))
nodes.interrupt_processing(False)
@@ -701,6 +786,25 @@ class PromptExecutor:
self.add_message("execution_start", { "prompt_id": prompt_id}, broadcast=False)
with torch.inference_mode():
if args.use_process_isolation:
try:
# Boundary cleanup runs at the start of the next workflow in
# isolation mode, matching non-isolated "next prompt" timing.
self.caches = CacheSet(cache_type=self.cache_type, cache_args=self.cache_args)
await self._wait_model_patcher_quiescence_safe(
fail_loud=False,
timeout_ms=120000,
marker="EX:boundary_cleanup_wait_idle",
)
await self._flush_running_extensions_transport_state_safe()
comfy.model_management.unload_all_models()
comfy.model_management.cleanup_models_gc()
comfy.model_management.cleanup_models()
gc.collect()
comfy.model_management.soft_empty_cache()
except Exception:
logging.debug("][ EX:isolation_boundary_cleanup_start failed", exc_info=True)
dynamic_prompt = DynamicPrompt(prompt)
reset_progress_state(prompt_id, dynamic_prompt)
add_progress_handler(WebUIProgressHandler(self.server))
@@ -727,6 +831,18 @@ class PromptExecutor:
for node_id in list(execute_outputs):
execution_list.add_node(node_id)
if args.use_process_isolation:
pending_class_types = set()
for node_id in execution_list.pendingNodes.keys():
class_type = dynamic_prompt.get_node(node_id)["class_type"]
pending_class_types.add(class_type)
await self._wait_model_patcher_quiescence_safe(
fail_loud=True,
timeout_ms=120000,
marker="EX:notify_graph_wait_idle",
)
await self._notify_execution_graph_safe(pending_class_types, fail_loud=True)
while not execution_list.is_empty():
node_id, error, ex = await execution_list.stage_node_execution()
if error is not None:
@@ -757,6 +873,7 @@ class PromptExecutor:
"outputs": ui_outputs,
"meta": meta_outputs,
}
comfy.model_management.cleanup_models_gc()
self.server.last_node_id = None
if comfy.model_management.DISABLE_SMART_MEMORY:
comfy.model_management.unload_all_models()