fix(isolation-lifecycle): execution/model ejection parity + fenced sampler device handling

add pyisolate==0.9.1 to requirements.txt
This commit is contained in:
John Pollock
2026-02-27 13:07:23 -06:00
parent a1c3124821
commit 3c8ba051b6
8 changed files with 229 additions and 56 deletions

View File

@@ -1,7 +1,9 @@
import copy
import gc
import heapq
import inspect
import logging
import os
import sys
import threading
import time
@@ -261,20 +263,31 @@ async def _async_map_node_over_list(prompt_id, unique_id, obj, input_data_all, f
pre_execute_cb(index)
# V3
if isinstance(obj, _ComfyNodeInternal) or (is_class(obj) and issubclass(obj, _ComfyNodeInternal)):
# if is just a class, then assign no state, just create clone
if is_class(obj):
type_obj = obj
obj.VALIDATE_CLASS()
class_clone = obj.PREPARE_CLASS_CLONE(v3_data)
# otherwise, use class instance to populate/reuse some fields
# Check for isolated node - skip validation and class cloning
if hasattr(obj, "_pyisolate_extension"):
# Isolated Node: The stub is just a proxy; real validation happens in child process
if v3_data is not None:
inputs = _io.build_nested_inputs(inputs, v3_data)
# Inject hidden inputs so they're available in the isolated child process
inputs.update(v3_data.get("hidden_inputs", {}))
f = getattr(obj, func)
# Standard V3 Node (Existing Logic)
else:
type_obj = type(obj)
type_obj.VALIDATE_CLASS()
class_clone = type_obj.PREPARE_CLASS_CLONE(v3_data)
f = make_locked_method_func(type_obj, func, class_clone)
# in case of dynamic inputs, restructure inputs to expected nested dict
if v3_data is not None:
inputs = _io.build_nested_inputs(inputs, v3_data)
# if is just a class, then assign no resources or state, just create clone
if is_class(obj):
type_obj = obj
obj.VALIDATE_CLASS()
class_clone = obj.PREPARE_CLASS_CLONE(v3_data)
# otherwise, use class instance to populate/reuse some fields
else:
type_obj = type(obj)
type_obj.VALIDATE_CLASS()
class_clone = type_obj.PREPARE_CLASS_CLONE(v3_data)
f = make_locked_method_func(type_obj, func, class_clone)
# in case of dynamic inputs, restructure inputs to expected nested dict
if v3_data is not None:
inputs = _io.build_nested_inputs(inputs, v3_data)
# V1
else:
f = getattr(obj, func)
@@ -536,6 +549,14 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed,
tasks = [x for x in output_data if isinstance(x, asyncio.Task)]
await asyncio.gather(*tasks, return_exceptions=True)
unblock()
# Keep isolation node execution deterministic by default, but allow
# opt-out for diagnostics.
isolation_sequential = os.environ.get("COMFY_ISOLATE_SEQUENTIAL", "1").lower() in ("1", "true", "yes")
if args.use_process_isolation and isolation_sequential:
await await_completion()
return await execute(server, dynprompt, caches, current_item, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_outputs)
asyncio.create_task(await_completion())
return (ExecutionResult.PENDING, None, None)
if len(output_ui) > 0:
@@ -647,6 +668,22 @@ class PromptExecutor:
self.status_messages = []
self.success = True
async def _notify_execution_graph_safe(self, class_types: set[str], *, fail_loud: bool = False) -> None:
try:
from comfy.isolation import notify_execution_graph
await notify_execution_graph(class_types)
except Exception:
if fail_loud:
raise
logging.debug("][ EX:notify_execution_graph failed", exc_info=True)
async def _flush_running_extensions_transport_state_safe(self) -> None:
try:
from comfy.isolation import flush_running_extensions_transport_state
await flush_running_extensions_transport_state()
except Exception:
logging.debug("][ EX:flush_running_extensions_transport_state failed", exc_info=True)
def add_message(self, event, data: dict, broadcast: bool):
data = {
**data,
@@ -688,6 +725,17 @@ class PromptExecutor:
asyncio.run(self.execute_async(prompt, prompt_id, extra_data, execute_outputs))
async def execute_async(self, prompt, prompt_id, extra_data={}, execute_outputs=[]):
# Update RPC event loops for all isolated extensions
# This is critical for serial workflow execution - each asyncio.run() creates
# a new event loop, and RPC instances must be updated to use it
try:
from comfy.isolation import update_rpc_event_loops
update_rpc_event_loops()
except ImportError:
pass # Isolation not available
except Exception as e:
logging.getLogger(__name__).warning(f"Failed to update RPC event loops: {e}")
set_preview_method(extra_data.get("preview_method"))
nodes.interrupt_processing(False)
@@ -701,6 +749,20 @@ class PromptExecutor:
self.add_message("execution_start", { "prompt_id": prompt_id}, broadcast=False)
with torch.inference_mode():
if args.use_process_isolation:
try:
# Boundary cleanup runs at the start of the next workflow in
# isolation mode, matching non-isolated "next prompt" timing.
self.caches = CacheSet(cache_type=self.cache_type, cache_args=self.cache_args)
await self._flush_running_extensions_transport_state_safe()
comfy.model_management.unload_all_models()
comfy.model_management.cleanup_models_gc()
comfy.model_management.cleanup_models()
gc.collect()
comfy.model_management.soft_empty_cache()
except Exception:
logging.debug("][ EX:isolation_boundary_cleanup_start failed", exc_info=True)
dynamic_prompt = DynamicPrompt(prompt)
reset_progress_state(prompt_id, dynamic_prompt)
add_progress_handler(WebUIProgressHandler(self.server))
@@ -727,6 +789,13 @@ class PromptExecutor:
for node_id in list(execute_outputs):
execution_list.add_node(node_id)
if args.use_process_isolation:
pending_class_types = set()
for node_id in execution_list.pendingNodes.keys():
class_type = dynamic_prompt.get_node(node_id)["class_type"]
pending_class_types.add(class_type)
await self._notify_execution_graph_safe(pending_class_types, fail_loud=True)
while not execution_list.is_empty():
node_id, error, ex = await execution_list.stage_node_execution()
if error is not None:
@@ -757,6 +826,7 @@ class PromptExecutor:
"outputs": ui_outputs,
"meta": meta_outputs,
}
comfy.model_management.cleanup_models_gc()
self.server.last_node_id = None
if comfy.model_management.DISABLE_SMART_MEMORY:
comfy.model_management.unload_all_models()