Compare commits

..

6 Commits

Author SHA1 Message Date
Jacob Segal
e9db9554bd ACTUALLY skip timing checks in CI 2025-09-03 12:30:44 -07:00
Jacob Segal
98be8e1969 Whoops, reverted a change in the last commit 2025-09-03 12:21:13 -07:00
Jacob Segal
766ff74207 Just disable timing-related assertions in CI
That way there's no risk of periodic non-deterministic test failures.
2025-09-03 12:18:48 -07:00
Jacob Segal
b1b5f87534 Add more leeway on async tests for Windows CI 2025-09-02 23:43:15 -07:00
Jacob Segal
cf45fd1742 Add missing test modules 2025-09-02 23:21:43 -07:00
Jacob Segal
e7314f49e6 Fix showing progress from other sessions
Because `client_id` was missing from ths `progress_state` message, it
was being sent to all connected sessions. This technically meant that if
someone had a graph with the same nodes, they would see the progress
updates for others.

Also added a test to prevent reoccurance and moved the tests around to
make CI easier to hook up.
2025-09-02 23:17:26 -07:00
35 changed files with 715 additions and 1115 deletions

30
.github/workflows/test-execution.yml vendored Normal file
View File

@@ -0,0 +1,30 @@
name: Execution Tests
on:
push:
branches: [ main, master ]
pull_request:
branches: [ main, master ]
jobs:
test:
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
continue-on-error: true
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install requirements
run: |
python -m pip install --upgrade pip
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
pip install -r requirements.txt
pip install -r tests-unit/requirements.txt
- name: Run Execution Tests
run: |
python -m pytest tests/execution -v --skip-timing-checks

View File

@@ -61,12 +61,8 @@ class CLIPEncoder(torch.nn.Module):
def forward(self, x, mask=None, intermediate_output=None):
optimized_attention = optimized_attention_for_device(x.device, mask=mask is not None, small_input=True)
all_intermediate = None
if intermediate_output is not None:
if intermediate_output == "all":
all_intermediate = []
intermediate_output = None
elif intermediate_output < 0:
if intermediate_output < 0:
intermediate_output = len(self.layers) + intermediate_output
intermediate = None
@@ -74,12 +70,6 @@ class CLIPEncoder(torch.nn.Module):
x = l(x, mask, optimized_attention)
if i == intermediate_output:
intermediate = x.clone()
if all_intermediate is not None:
all_intermediate.append(x.unsqueeze(1).clone())
if all_intermediate is not None:
intermediate = torch.cat(all_intermediate, dim=1)
return x, intermediate
class CLIPEmbeddings(torch.nn.Module):

View File

@@ -50,13 +50,7 @@ class ClipVisionModel():
self.image_size = config.get("image_size", 224)
self.image_mean = config.get("image_mean", [0.48145466, 0.4578275, 0.40821073])
self.image_std = config.get("image_std", [0.26862954, 0.26130258, 0.27577711])
model_type = config.get("model_type", "clip_vision_model")
model_class = IMAGE_ENCODERS.get(model_type)
if model_type == "siglip_vision_model":
self.return_all_hidden_states = True
else:
self.return_all_hidden_states = False
model_class = IMAGE_ENCODERS.get(config.get("model_type", "clip_vision_model"))
self.load_device = comfy.model_management.text_encoder_device()
offload_device = comfy.model_management.text_encoder_offload_device()
self.dtype = comfy.model_management.text_encoder_dtype(self.load_device)
@@ -74,18 +68,12 @@ class ClipVisionModel():
def encode_image(self, image, crop=True):
comfy.model_management.load_model_gpu(self.patcher)
pixel_values = clip_preprocess(image.to(self.load_device), size=self.image_size, mean=self.image_mean, std=self.image_std, crop=crop).float()
out = self.model(pixel_values=pixel_values, intermediate_output='all' if self.return_all_hidden_states else -2)
out = self.model(pixel_values=pixel_values, intermediate_output=-2)
outputs = Output()
outputs["last_hidden_state"] = out[0].to(comfy.model_management.intermediate_device())
outputs["image_embeds"] = out[2].to(comfy.model_management.intermediate_device())
if self.return_all_hidden_states:
all_hs = out[1].to(comfy.model_management.intermediate_device())
outputs["penultimate_hidden_states"] = all_hs[:, -2]
outputs["all_hidden_states"] = all_hs
else:
outputs["penultimate_hidden_states"] = out[1].to(comfy.model_management.intermediate_device())
outputs["penultimate_hidden_states"] = out[1].to(comfy.model_management.intermediate_device())
outputs["mm_projected"] = out[3]
return outputs

View File

@@ -632,7 +632,7 @@ class ContinuousTransformer(nn.Module):
# Attention layers
if self.rotary_pos_emb is not None:
rotary_pos_emb = self.rotary_pos_emb.forward_from_seq_len(x.shape[1], dtype=torch.float, device=x.device)
rotary_pos_emb = self.rotary_pos_emb.forward_from_seq_len(x.shape[1], dtype=x.dtype, device=x.device)
else:
rotary_pos_emb = None

View File

@@ -106,7 +106,6 @@ class Flux(nn.Module):
if y is None:
y = torch.zeros((img.shape[0], self.params.vec_in_dim), device=img.device, dtype=img.dtype)
patches = transformer_options.get("patches", {})
patches_replace = transformer_options.get("patches_replace", {})
if img.ndim != 3 or txt.ndim != 3:
raise ValueError("Input img and txt tensors must have 3 dimensions.")
@@ -118,17 +117,9 @@ class Flux(nn.Module):
if guidance is not None:
vec = vec + self.guidance_in(timestep_embedding(guidance, 256).to(img.dtype))
vec = vec + self.vector_in(y[:, :self.params.vec_in_dim])
vec = vec + self.vector_in(y[:,:self.params.vec_in_dim])
txt = self.txt_in(txt)
if "post_input" in patches:
for p in patches["post_input"]:
out = p({"img": img, "txt": txt, "img_ids": img_ids, "txt_ids": txt_ids})
img = out["img"]
txt = out["txt"]
img_ids = out["img_ids"]
txt_ids = out["txt_ids"]
if img_ids is not None:
ids = torch.cat((txt_ids, img_ids), dim=1)
pe = self.pe_embedder(ids)
@@ -248,7 +239,7 @@ class Flux(nn.Module):
index += 1
h_offset = 0
w_offset = 0
elif ref_latents_method == "uxo":
elif ref_latents_method == "uso":
index = 0
h_offset = h_len * patch_size + h
w_offset = w_len * patch_size + w

View File

@@ -433,9 +433,6 @@ class ModelPatcher:
def set_model_double_block_patch(self, patch):
self.set_model_patch(patch, "double_block")
def set_model_post_input_patch(self, patch):
self.set_model_patch(patch, "post_input")
def add_object_patch(self, name, obj):
self.object_patches[name] = obj

View File

@@ -951,11 +951,7 @@ class MagicPrompt2(str, Enum):
class StyleType1(str, Enum):
AUTO = 'AUTO'
GENERAL = 'GENERAL'
REALISTIC = 'REALISTIC'
DESIGN = 'DESIGN'
FICTION = 'FICTION'
class ImagenImageGenerationInstance(BaseModel):
@@ -2680,7 +2676,7 @@ class ReleaseNote(BaseModel):
class RenderingSpeed(str, Enum):
DEFAULT = 'DEFAULT'
BALANCED = 'BALANCED'
TURBO = 'TURBO'
QUALITY = 'QUALITY'
@@ -4922,14 +4918,6 @@ class IdeogramV3EditRequest(BaseModel):
None,
description='A set of images to use as style references (maximum total size 10MB across all style references). The images should be in JPEG, PNG or WebP format.',
)
character_reference_images: Optional[List[str]] = Field(
None,
description='Generations with character reference are subject to the character reference pricing. A set of images to use as character references (maximum total size 10MB across all character references), currently only supports 1 character reference image. The images should be in JPEG, PNG or WebP format.'
)
character_reference_images_mask: Optional[List[str]] = Field(
None,
description='Optional masks for character reference images. When provided, must match the number of character_reference_images. Each mask should be a grayscale image of the same dimensions as the corresponding character reference image. The images should be in JPEG, PNG or WebP format.'
)
class IdeogramV3Request(BaseModel):
@@ -4963,14 +4951,6 @@ class IdeogramV3Request(BaseModel):
style_type: Optional[StyleType1] = Field(
None, description='The type of style to apply'
)
character_reference_images: Optional[List[str]] = Field(
None,
description='Generations with character reference are subject to the character reference pricing. A set of images to use as character references (maximum total size 10MB across all character references), currently only supports 1 character reference image. The images should be in JPEG, PNG or WebP format.'
)
character_reference_images_mask: Optional[List[str]] = Field(
None,
description='Optional masks for character reference images. When provided, must match the number of character_reference_images. Each mask should be a grayscale image of the same dimensions as the corresponding character reference image. The images should be in JPEG, PNG or WebP format.'
)
class ImagenGenerateImageResponse(BaseModel):

View File

@@ -1,336 +0,0 @@
import logging
from enum import Enum
from typing import Optional
from typing_extensions import override
import torch
from pydantic import BaseModel, Field
from comfy_api.latest import ComfyExtension, io as comfy_io
from comfy_api_nodes.util.validation_utils import (
validate_image_aspect_ratio_range,
get_number_of_images,
)
from comfy_api_nodes.apis.client import (
ApiEndpoint,
HttpMethod,
SynchronousOperation,
)
from comfy_api_nodes.apinode_utils import download_url_to_image_tensor, upload_images_to_comfyapi, validate_string
BYTEPLUS_ENDPOINT = "/proxy/byteplus/api/v3/images/generations"
class Text2ImageModelName(str, Enum):
seedream3 = "seedream-3-0-t2i-250415"
class Image2ImageModelName(str, Enum):
seededit3 = "seededit-3-0-i2i-250628"
class Text2ImageTaskCreationRequest(BaseModel):
model: Text2ImageModelName = Text2ImageModelName.seedream3
prompt: str = Field(...)
response_format: Optional[str] = Field("url")
size: Optional[str] = Field(None)
seed: Optional[int] = Field(0, ge=0, le=2147483647)
guidance_scale: Optional[float] = Field(..., ge=1.0, le=10.0)
watermark: Optional[bool] = Field(True)
class Image2ImageTaskCreationRequest(BaseModel):
model: Image2ImageModelName = Image2ImageModelName.seededit3
prompt: str = Field(...)
response_format: Optional[str] = Field("url")
image: str = Field(..., description="Base64 encoded string or image URL")
size: Optional[str] = Field("adaptive")
seed: Optional[int] = Field(..., ge=0, le=2147483647)
guidance_scale: Optional[float] = Field(..., ge=1.0, le=10.0)
watermark: Optional[bool] = Field(True)
class ImageTaskCreationResponse(BaseModel):
model: str = Field(...)
created: int = Field(..., description="Unix timestamp (in seconds) indicating time when the request was created.")
data: list = Field([], description="Contains information about the generated image(s).")
error: dict = Field({}, description="Contains `code` and `message` fields in case of error.")
RECOMMENDED_PRESETS = [
("1024x1024 (1:1)", 1024, 1024),
("864x1152 (3:4)", 864, 1152),
("1152x864 (4:3)", 1152, 864),
("1280x720 (16:9)", 1280, 720),
("720x1280 (9:16)", 720, 1280),
("832x1248 (2:3)", 832, 1248),
("1248x832 (3:2)", 1248, 832),
("1512x648 (21:9)", 1512, 648),
("2048x2048 (1:1)", 2048, 2048),
("Custom", None, None),
]
def get_image_url_from_response(response: ImageTaskCreationResponse) -> str:
if response.error:
error_msg = f"ByteDance request failed. Code: {response.error['code']}, message: {response.error['message']}"
logging.info(error_msg)
raise RuntimeError(error_msg)
logging.info("ByteDance task succeeded, image URL: %s", response.data[0]["url"])
return response.data[0]["url"]
class ByteDanceImageNode(comfy_io.ComfyNode):
@classmethod
def define_schema(cls):
return comfy_io.Schema(
node_id="ByteDanceImageNode",
display_name="ByteDance Image",
category="api node/image/ByteDance",
description="Generate images using ByteDance models via api based on prompt",
inputs=[
comfy_io.Combo.Input(
"model",
options=[model.value for model in Text2ImageModelName],
default=Text2ImageModelName.seedream3.value,
tooltip="Model name",
),
comfy_io.String.Input(
"prompt",
multiline=True,
tooltip="The text prompt used to generate the image",
),
comfy_io.Combo.Input(
"size_preset",
options=[label for label, _, _ in RECOMMENDED_PRESETS],
tooltip="Pick a recommended size. Select Custom to use the width and height below",
),
comfy_io.Int.Input(
"width",
default=1024,
min=512,
max=2048,
step=64,
tooltip="Custom width for image. Value is working only if `size_preset` is set to `Custom`",
),
comfy_io.Int.Input(
"height",
default=1024,
min=512,
max=2048,
step=64,
tooltip="Custom height for image. Value is working only if `size_preset` is set to `Custom`",
),
comfy_io.Int.Input(
"seed",
default=0,
min=0,
max=2147483647,
step=1,
display_mode=comfy_io.NumberDisplay.number,
control_after_generate=True,
tooltip="Seed to use for generation",
optional=True,
),
comfy_io.Float.Input(
"guidance_scale",
default=2.5,
min=1.0,
max=10.0,
step=0.01,
display_mode=comfy_io.NumberDisplay.number,
tooltip="Higher value makes the image follow the prompt more closely",
optional=True,
),
comfy_io.Boolean.Input(
"watermark",
default=True,
tooltip="Whether to add an \"AI generated\" watermark to the image",
optional=True,
),
],
outputs=[
comfy_io.Image.Output(),
],
hidden=[
comfy_io.Hidden.auth_token_comfy_org,
comfy_io.Hidden.api_key_comfy_org,
comfy_io.Hidden.unique_id,
],
is_api_node=True,
)
@classmethod
async def execute(
cls,
model: str,
prompt: str,
size_preset: str,
width: int,
height: int,
seed: int,
guidance_scale: float,
watermark: bool,
) -> comfy_io.NodeOutput:
validate_string(prompt, strip_whitespace=True, min_length=1)
w = h = None
for label, tw, th in RECOMMENDED_PRESETS:
if label == size_preset:
w, h = tw, th
break
if w is None or h is None:
w, h = width, height
if not (512 <= w <= 2048) or not (512 <= h <= 2048):
raise ValueError(
f"Custom size out of range: {w}x{h}. "
"Both width and height must be between 512 and 2048 pixels."
)
payload = Text2ImageTaskCreationRequest(
model=model,
prompt=prompt,
size=f"{w}x{h}",
seed=seed,
guidance_scale=guidance_scale,
watermark=watermark,
)
auth_kwargs = {
"auth_token": cls.hidden.auth_token_comfy_org,
"comfy_api_key": cls.hidden.api_key_comfy_org,
}
response = await SynchronousOperation(
endpoint=ApiEndpoint(
path=BYTEPLUS_ENDPOINT,
method=HttpMethod.POST,
request_model=Text2ImageTaskCreationRequest,
response_model=ImageTaskCreationResponse,
),
request=payload,
auth_kwargs=auth_kwargs,
).execute()
return comfy_io.NodeOutput(await download_url_to_image_tensor(get_image_url_from_response(response)))
class ByteDanceImageEditNode(comfy_io.ComfyNode):
@classmethod
def define_schema(cls):
return comfy_io.Schema(
node_id="ByteDanceImageEditNode",
display_name="ByteDance Image Edit",
category="api node/video/ByteDance",
description="Edit images using ByteDance models via api based on prompt",
inputs=[
comfy_io.Combo.Input(
"model",
options=[model.value for model in Image2ImageModelName],
default=Image2ImageModelName.seededit3.value,
tooltip="Model name",
),
comfy_io.Image.Input(
"image",
tooltip="The base image to edit",
),
comfy_io.String.Input(
"prompt",
multiline=True,
default="",
tooltip="Instruction to edit image",
),
comfy_io.Int.Input(
"seed",
default=0,
min=0,
max=2147483647,
step=1,
display_mode=comfy_io.NumberDisplay.number,
control_after_generate=True,
tooltip="Seed to use for generation",
optional=True,
),
comfy_io.Float.Input(
"guidance_scale",
default=5.5,
min=1.0,
max=10.0,
step=0.01,
display_mode=comfy_io.NumberDisplay.number,
tooltip="Higher value makes the image follow the prompt more closely",
optional=True,
),
comfy_io.Boolean.Input(
"watermark",
default=True,
tooltip="Whether to add an \"AI generated\" watermark to the image",
optional=True,
),
],
outputs=[
comfy_io.Image.Output(),
],
hidden=[
comfy_io.Hidden.auth_token_comfy_org,
comfy_io.Hidden.api_key_comfy_org,
comfy_io.Hidden.unique_id,
],
is_api_node=True,
)
@classmethod
async def execute(
cls,
model: str,
image: torch.Tensor,
prompt: str,
seed: int,
guidance_scale: float,
watermark: bool,
) -> comfy_io.NodeOutput:
validate_string(prompt, strip_whitespace=True, min_length=1)
if get_number_of_images(image) != 1:
raise ValueError("Exactly one input image is required.")
validate_image_aspect_ratio_range(image, (1, 3), (3, 1))
auth_kwargs = {
"auth_token": cls.hidden.auth_token_comfy_org,
"comfy_api_key": cls.hidden.api_key_comfy_org,
}
source_url = (await upload_images_to_comfyapi(
image,
max_images=1,
mime_type="image/png",
auth_kwargs=auth_kwargs,
))[0]
payload = Image2ImageTaskCreationRequest(
model=model,
prompt=prompt,
image=source_url,
seed=seed,
guidance_scale=guidance_scale,
watermark=watermark,
)
response = await SynchronousOperation(
endpoint=ApiEndpoint(
path=BYTEPLUS_ENDPOINT,
method=HttpMethod.POST,
request_model=Image2ImageTaskCreationRequest,
response_model=ImageTaskCreationResponse,
),
request=payload,
auth_kwargs=auth_kwargs,
).execute()
return comfy_io.NodeOutput(await download_url_to_image_tensor(get_image_url_from_response(response)))
class ByteDanceExtension(ComfyExtension):
@override
async def get_node_list(self) -> list[type[comfy_io.ComfyNode]]:
return [
ByteDanceImageNode,
ByteDanceImageEditNode,
]
async def comfy_entrypoint() -> ByteDanceExtension:
return ByteDanceExtension()

View File

@@ -255,7 +255,6 @@ class IdeogramV1(comfy_io.ComfyNode):
display_name="Ideogram V1",
category="api node/image/Ideogram",
description="Generates images using the Ideogram V1 model.",
is_api_node=True,
inputs=[
comfy_io.String.Input(
"prompt",
@@ -384,7 +383,6 @@ class IdeogramV2(comfy_io.ComfyNode):
display_name="Ideogram V2",
category="api node/image/Ideogram",
description="Generates images using the Ideogram V2 model.",
is_api_node=True,
inputs=[
comfy_io.String.Input(
"prompt",
@@ -554,7 +552,6 @@ class IdeogramV3(comfy_io.ComfyNode):
category="api node/image/Ideogram",
description="Generates images using the Ideogram V3 model. "
"Supports both regular image generation from text prompts and image editing with mask.",
is_api_node=True,
inputs=[
comfy_io.String.Input(
"prompt",
@@ -615,21 +612,11 @@ class IdeogramV3(comfy_io.ComfyNode):
),
comfy_io.Combo.Input(
"rendering_speed",
options=["DEFAULT", "TURBO", "QUALITY"],
default="DEFAULT",
options=["BALANCED", "TURBO", "QUALITY"],
default="BALANCED",
tooltip="Controls the trade-off between generation speed and quality",
optional=True,
),
comfy_io.Image.Input(
"character_image",
tooltip="Image to use as character reference.",
optional=True,
),
comfy_io.Mask.Input(
"character_mask",
tooltip="Optional mask for character reference image.",
optional=True,
),
],
outputs=[
comfy_io.Image.Output(),
@@ -652,46 +639,12 @@ class IdeogramV3(comfy_io.ComfyNode):
magic_prompt_option="AUTO",
seed=0,
num_images=1,
rendering_speed="DEFAULT",
character_image=None,
character_mask=None,
rendering_speed="BALANCED",
):
auth = {
"auth_token": cls.hidden.auth_token_comfy_org,
"comfy_api_key": cls.hidden.api_key_comfy_org,
}
if rendering_speed == "BALANCED": # for backward compatibility
rendering_speed = "DEFAULT"
character_img_binary = None
character_mask_binary = None
if character_image is not None:
input_tensor = character_image.squeeze().cpu()
if character_mask is not None:
character_mask = resize_mask_to_image(character_mask, character_image, allow_gradient=False)
character_mask = 1.0 - character_mask
if character_mask.shape[1:] != character_image.shape[1:-1]:
raise Exception("Character mask and image must be the same size")
mask_np = (character_mask.squeeze().cpu().numpy() * 255).astype(np.uint8)
mask_img = Image.fromarray(mask_np)
mask_byte_arr = BytesIO()
mask_img.save(mask_byte_arr, format="PNG")
mask_byte_arr.seek(0)
character_mask_binary = mask_byte_arr
character_mask_binary.name = "mask.png"
img_np = (input_tensor.numpy() * 255).astype(np.uint8)
img = Image.fromarray(img_np)
img_byte_arr = BytesIO()
img.save(img_byte_arr, format="PNG")
img_byte_arr.seek(0)
character_img_binary = img_byte_arr
character_img_binary.name = "image.png"
elif character_mask is not None:
raise Exception("Character mask requires character image to be present")
# Check if both image and mask are provided for editing mode
if image is not None and mask is not None:
# Edit mode
@@ -740,15 +693,6 @@ class IdeogramV3(comfy_io.ComfyNode):
if num_images > 1:
edit_request.num_images = num_images
files = {
"image": img_binary,
"mask": mask_binary,
}
if character_img_binary:
files["character_reference_images"] = character_img_binary
if character_mask_binary:
files["character_mask_binary"] = character_mask_binary
# Execute the operation for edit mode
operation = SynchronousOperation(
endpoint=ApiEndpoint(
@@ -758,7 +702,10 @@ class IdeogramV3(comfy_io.ComfyNode):
response_model=IdeogramGenerateResponse,
),
request=edit_request,
files=files,
files={
"image": img_binary,
"mask": mask_binary,
},
content_type="multipart/form-data",
auth_kwargs=auth,
)
@@ -792,14 +739,6 @@ class IdeogramV3(comfy_io.ComfyNode):
if num_images > 1:
gen_request.num_images = num_images
files = {}
if character_img_binary:
files["character_reference_images"] = character_img_binary
if character_mask_binary:
files["character_mask_binary"] = character_mask_binary
if files:
gen_request.style_type = "AUTO"
# Execute the operation for generation mode
operation = SynchronousOperation(
endpoint=ApiEndpoint(
@@ -809,8 +748,6 @@ class IdeogramV3(comfy_io.ComfyNode):
response_model=IdeogramGenerateResponse,
),
request=gen_request,
files=files if files else None,
content_type="multipart/form-data",
auth_kwargs=auth,
)

View File

@@ -12,7 +12,6 @@ User Guides:
"""
from typing import Union, Optional, Any
from typing_extensions import override
from enum import Enum
import torch
@@ -47,9 +46,9 @@ from comfy_api_nodes.apinode_utils import (
validate_string,
download_url_to_image_tensor,
)
from comfy_api_nodes.mapper_utils import model_field_to_node_input
from comfy_api.input_impl import VideoFromFile
from comfy_api.latest import ComfyExtension, io as comfy_io
from comfy_api_nodes.util.validation_utils import validate_image_dimensions, validate_image_aspect_ratio
from comfy.comfy_types.node_typing import IO, ComfyNodeABC
PATH_IMAGE_TO_VIDEO = "/proxy/runway/image_to_video"
PATH_TEXT_TO_IMAGE = "/proxy/runway/text_to_image"
@@ -86,11 +85,20 @@ class RunwayGen3aAspectRatio(str, Enum):
def get_video_url_from_task_status(response: TaskStatusResponse) -> Union[str, None]:
"""Returns the video URL from the task status response if it exists."""
if hasattr(response, "output") and len(response.output) > 0:
if response.output and len(response.output) > 0:
return response.output[0]
return None
# TODO: replace with updated image validation utils (upstream)
def validate_input_image(image: torch.Tensor) -> bool:
"""
Validate the input image is within the size limits for the Runway API.
See: https://docs.dev.runwayml.com/assets/inputs/#common-error-reasons
"""
return image.shape[2] < 8000 and image.shape[1] < 8000
async def poll_until_finished(
auth_kwargs: dict[str, str],
api_endpoint: ApiEndpoint[Any, TaskStatusResponse],
@@ -126,438 +134,458 @@ def extract_progress_from_task_status(
def get_image_url_from_task_status(response: TaskStatusResponse) -> Union[str, None]:
"""Returns the image URL from the task status response if it exists."""
if hasattr(response, "output") and len(response.output) > 0:
if response.output and len(response.output) > 0:
return response.output[0]
return None
async def get_response(
task_id: str, auth_kwargs: dict[str, str], node_id: Optional[str] = None, estimated_duration: Optional[int] = None
) -> TaskStatusResponse:
"""Poll the task status until it is finished then get the response."""
return await poll_until_finished(
auth_kwargs,
ApiEndpoint(
path=f"{PATH_GET_TASK_STATUS}/{task_id}",
method=HttpMethod.GET,
request_model=EmptyRequest,
response_model=TaskStatusResponse,
),
estimated_duration=estimated_duration,
node_id=node_id,
)
class RunwayVideoGenNode(ComfyNodeABC):
"""Runway Video Node Base."""
RETURN_TYPES = ("VIDEO",)
FUNCTION = "api_call"
CATEGORY = "api node/video/Runway"
API_NODE = True
async def generate_video(
request: RunwayImageToVideoRequest,
auth_kwargs: dict[str, str],
node_id: Optional[str] = None,
estimated_duration: Optional[int] = None,
) -> VideoFromFile:
initial_operation = SynchronousOperation(
endpoint=ApiEndpoint(
path=PATH_IMAGE_TO_VIDEO,
method=HttpMethod.POST,
request_model=RunwayImageToVideoRequest,
response_model=RunwayImageToVideoResponse,
),
request=request,
auth_kwargs=auth_kwargs,
)
def validate_task_created(self, response: RunwayImageToVideoResponse) -> bool:
"""
Validate the task creation response from the Runway API matches
expected format.
"""
if not bool(response.id):
raise RunwayApiError("Invalid initial response from Runway API.")
return True
initial_response = await initial_operation.execute()
def validate_response(self, response: RunwayImageToVideoResponse) -> bool:
"""
Validate the successful task status response from the Runway API
matches expected format.
"""
if not response.output or len(response.output) == 0:
raise RunwayApiError(
"Runway task succeeded but no video data found in response."
)
return True
final_response = await get_response(initial_response.id, auth_kwargs, node_id, estimated_duration)
if not final_response.output:
raise RunwayApiError("Runway task succeeded but no video data found in response.")
video_url = get_video_url_from_task_status(final_response)
return await download_url_to_video_output(video_url)
class RunwayImageToVideoNodeGen3a(comfy_io.ComfyNode):
@classmethod
def define_schema(cls):
return comfy_io.Schema(
node_id="RunwayImageToVideoNodeGen3a",
display_name="Runway Image to Video (Gen3a Turbo)",
category="api node/video/Runway",
description="Generate a video from a single starting frame using Gen3a Turbo model. "
"Before diving in, review these best practices to ensure that "
"your input selections will set your generation up for success: "
"https://help.runwayml.com/hc/en-us/articles/33927968552339-Creating-with-Act-One-on-Gen-3-Alpha-and-Turbo.",
inputs=[
comfy_io.String.Input(
"prompt",
multiline=True,
default="",
tooltip="Text prompt for the generation",
),
comfy_io.Image.Input(
"start_frame",
tooltip="Start frame to be used for the video",
),
comfy_io.Combo.Input(
"duration",
options=[model.value for model in Duration],
),
comfy_io.Combo.Input(
"ratio",
options=[model.value for model in RunwayGen3aAspectRatio],
),
comfy_io.Int.Input(
"seed",
default=0,
min=0,
max=4294967295,
step=1,
control_after_generate=True,
display_mode=comfy_io.NumberDisplay.number,
tooltip="Random seed for generation",
),
],
outputs=[
comfy_io.Video.Output(),
],
hidden=[
comfy_io.Hidden.auth_token_comfy_org,
comfy_io.Hidden.api_key_comfy_org,
comfy_io.Hidden.unique_id,
],
is_api_node=True,
async def get_response(
self, task_id: str, auth_kwargs: dict[str, str], node_id: Optional[str] = None
) -> RunwayImageToVideoResponse:
"""Poll the task status until it is finished then get the response."""
return await poll_until_finished(
auth_kwargs,
ApiEndpoint(
path=f"{PATH_GET_TASK_STATUS}/{task_id}",
method=HttpMethod.GET,
request_model=EmptyRequest,
response_model=TaskStatusResponse,
),
estimated_duration=AVERAGE_DURATION_FLF_SECONDS,
node_id=node_id,
)
async def generate_video(
self,
request: RunwayImageToVideoRequest,
auth_kwargs: dict[str, str],
node_id: Optional[str] = None,
) -> tuple[VideoFromFile]:
initial_operation = SynchronousOperation(
endpoint=ApiEndpoint(
path=PATH_IMAGE_TO_VIDEO,
method=HttpMethod.POST,
request_model=RunwayImageToVideoRequest,
response_model=RunwayImageToVideoResponse,
),
request=request,
auth_kwargs=auth_kwargs,
)
initial_response = await initial_operation.execute()
self.validate_task_created(initial_response)
task_id = initial_response.id
final_response = await self.get_response(task_id, auth_kwargs, node_id)
self.validate_response(final_response)
video_url = get_video_url_from_task_status(final_response)
return (await download_url_to_video_output(video_url),)
class RunwayImageToVideoNodeGen3a(RunwayVideoGenNode):
"""Runway Image to Video Node using Gen3a Turbo model."""
DESCRIPTION = "Generate a video from a single starting frame using Gen3a Turbo model. Before diving in, review these best practices to ensure that your input selections will set your generation up for success: https://help.runwayml.com/hc/en-us/articles/33927968552339-Creating-with-Act-One-on-Gen-3-Alpha-and-Turbo."
@classmethod
async def execute(
cls,
def INPUT_TYPES(s):
return {
"required": {
"prompt": model_field_to_node_input(
IO.STRING, RunwayImageToVideoRequest, "promptText", multiline=True
),
"start_frame": (
IO.IMAGE,
{"tooltip": "Start frame to be used for the video"},
),
"duration": model_field_to_node_input(
IO.COMBO, RunwayImageToVideoRequest, "duration", enum_type=Duration
),
"ratio": model_field_to_node_input(
IO.COMBO,
RunwayImageToVideoRequest,
"ratio",
enum_type=RunwayGen3aAspectRatio,
),
"seed": model_field_to_node_input(
IO.INT,
RunwayImageToVideoRequest,
"seed",
control_after_generate=True,
),
},
"hidden": {
"auth_token": "AUTH_TOKEN_COMFY_ORG",
"comfy_api_key": "API_KEY_COMFY_ORG",
"unique_id": "UNIQUE_ID",
},
}
async def api_call(
self,
prompt: str,
start_frame: torch.Tensor,
duration: str,
ratio: str,
seed: int,
) -> comfy_io.NodeOutput:
unique_id: Optional[str] = None,
**kwargs,
) -> tuple[VideoFromFile]:
# Validate inputs
validate_string(prompt, min_length=1)
validate_image_dimensions(start_frame, max_width=7999, max_height=7999)
validate_image_aspect_ratio(start_frame, min_aspect_ratio=0.5, max_aspect_ratio=2.0)
auth_kwargs = {
"auth_token": cls.hidden.auth_token_comfy_org,
"comfy_api_key": cls.hidden.api_key_comfy_org,
}
validate_input_image(start_frame)
# Upload image
download_urls = await upload_images_to_comfyapi(
start_frame,
max_images=1,
mime_type="image/png",
auth_kwargs=auth_kwargs,
auth_kwargs=kwargs,
)
if len(download_urls) != 1:
raise RunwayApiError("Failed to upload one or more images to comfy api.")
return comfy_io.NodeOutput(
await generate_video(
RunwayImageToVideoRequest(
promptText=prompt,
seed=seed,
model=Model("gen3a_turbo"),
duration=Duration(duration),
ratio=AspectRatio(ratio),
promptImage=RunwayPromptImageObject(
root=[
RunwayPromptImageDetailedObject(
uri=str(download_urls[0]), position="first"
)
]
),
return await self.generate_video(
RunwayImageToVideoRequest(
promptText=prompt,
seed=seed,
model=Model("gen3a_turbo"),
duration=Duration(duration),
ratio=AspectRatio(ratio),
promptImage=RunwayPromptImageObject(
root=[
RunwayPromptImageDetailedObject(
uri=str(download_urls[0]), position="first"
)
]
),
auth_kwargs=auth_kwargs,
node_id=cls.hidden.unique_id,
)
),
auth_kwargs=kwargs,
node_id=unique_id,
)
class RunwayImageToVideoNodeGen4(comfy_io.ComfyNode):
class RunwayImageToVideoNodeGen4(RunwayVideoGenNode):
"""Runway Image to Video Node using Gen4 Turbo model."""
DESCRIPTION = "Generate a video from a single starting frame using Gen4 Turbo model. Before diving in, review these best practices to ensure that your input selections will set your generation up for success: https://help.runwayml.com/hc/en-us/articles/37327109429011-Creating-with-Gen-4-Video."
@classmethod
def define_schema(cls):
return comfy_io.Schema(
node_id="RunwayImageToVideoNodeGen4",
display_name="Runway Image to Video (Gen4 Turbo)",
category="api node/video/Runway",
description="Generate a video from a single starting frame using Gen4 Turbo model. "
"Before diving in, review these best practices to ensure that "
"your input selections will set your generation up for success: "
"https://help.runwayml.com/hc/en-us/articles/37327109429011-Creating-with-Gen-4-Video.",
inputs=[
comfy_io.String.Input(
"prompt",
multiline=True,
default="",
tooltip="Text prompt for the generation",
def INPUT_TYPES(s):
return {
"required": {
"prompt": model_field_to_node_input(
IO.STRING, RunwayImageToVideoRequest, "promptText", multiline=True
),
comfy_io.Image.Input(
"start_frame",
tooltip="Start frame to be used for the video",
"start_frame": (
IO.IMAGE,
{"tooltip": "Start frame to be used for the video"},
),
comfy_io.Combo.Input(
"duration",
options=[model.value for model in Duration],
"duration": model_field_to_node_input(
IO.COMBO, RunwayImageToVideoRequest, "duration", enum_type=Duration
),
comfy_io.Combo.Input(
"ratio": model_field_to_node_input(
IO.COMBO,
RunwayImageToVideoRequest,
"ratio",
options=[model.value for model in RunwayGen4TurboAspectRatio],
enum_type=RunwayGen4TurboAspectRatio,
),
comfy_io.Int.Input(
"seed": model_field_to_node_input(
IO.INT,
RunwayImageToVideoRequest,
"seed",
default=0,
min=0,
max=4294967295,
step=1,
control_after_generate=True,
display_mode=comfy_io.NumberDisplay.number,
tooltip="Random seed for generation",
),
],
outputs=[
comfy_io.Video.Output(),
],
hidden=[
comfy_io.Hidden.auth_token_comfy_org,
comfy_io.Hidden.api_key_comfy_org,
comfy_io.Hidden.unique_id,
],
is_api_node=True,
)
},
"hidden": {
"auth_token": "AUTH_TOKEN_COMFY_ORG",
"comfy_api_key": "API_KEY_COMFY_ORG",
"unique_id": "UNIQUE_ID",
},
}
@classmethod
async def execute(
cls,
async def api_call(
self,
prompt: str,
start_frame: torch.Tensor,
duration: str,
ratio: str,
seed: int,
) -> comfy_io.NodeOutput:
unique_id: Optional[str] = None,
**kwargs,
) -> tuple[VideoFromFile]:
# Validate inputs
validate_string(prompt, min_length=1)
validate_image_dimensions(start_frame, max_width=7999, max_height=7999)
validate_image_aspect_ratio(start_frame, min_aspect_ratio=0.5, max_aspect_ratio=2.0)
auth_kwargs = {
"auth_token": cls.hidden.auth_token_comfy_org,
"comfy_api_key": cls.hidden.api_key_comfy_org,
}
validate_input_image(start_frame)
# Upload image
download_urls = await upload_images_to_comfyapi(
start_frame,
max_images=1,
mime_type="image/png",
auth_kwargs=auth_kwargs,
auth_kwargs=kwargs,
)
if len(download_urls) != 1:
raise RunwayApiError("Failed to upload one or more images to comfy api.")
return comfy_io.NodeOutput(
await generate_video(
RunwayImageToVideoRequest(
promptText=prompt,
seed=seed,
model=Model("gen4_turbo"),
duration=Duration(duration),
ratio=AspectRatio(ratio),
promptImage=RunwayPromptImageObject(
root=[
RunwayPromptImageDetailedObject(
uri=str(download_urls[0]), position="first"
)
]
),
return await self.generate_video(
RunwayImageToVideoRequest(
promptText=prompt,
seed=seed,
model=Model("gen4_turbo"),
duration=Duration(duration),
ratio=AspectRatio(ratio),
promptImage=RunwayPromptImageObject(
root=[
RunwayPromptImageDetailedObject(
uri=str(download_urls[0]), position="first"
)
]
),
auth_kwargs=auth_kwargs,
node_id=cls.hidden.unique_id,
estimated_duration=AVERAGE_DURATION_FLF_SECONDS,
)
),
auth_kwargs=kwargs,
node_id=unique_id,
)
class RunwayFirstLastFrameNode(comfy_io.ComfyNode):
class RunwayFirstLastFrameNode(RunwayVideoGenNode):
"""Runway First-Last Frame Node."""
DESCRIPTION = "Upload first and last keyframes, draft a prompt, and generate a video. More complex transitions, such as cases where the Last frame is completely different from the First frame, may benefit from the longer 10s duration. This would give the generation more time to smoothly transition between the two inputs. Before diving in, review these best practices to ensure that your input selections will set your generation up for success: https://help.runwayml.com/hc/en-us/articles/34170748696595-Creating-with-Keyframes-on-Gen-3."
async def get_response(
self, task_id: str, auth_kwargs: dict[str, str], node_id: Optional[str] = None
) -> RunwayImageToVideoResponse:
return await poll_until_finished(
auth_kwargs,
ApiEndpoint(
path=f"{PATH_GET_TASK_STATUS}/{task_id}",
method=HttpMethod.GET,
request_model=EmptyRequest,
response_model=TaskStatusResponse,
),
estimated_duration=AVERAGE_DURATION_FLF_SECONDS,
node_id=node_id,
)
@classmethod
def define_schema(cls):
return comfy_io.Schema(
node_id="RunwayFirstLastFrameNode",
display_name="Runway First-Last-Frame to Video",
category="api node/video/Runway",
description="Upload first and last keyframes, draft a prompt, and generate a video. "
"More complex transitions, such as cases where the Last frame is completely different "
"from the First frame, may benefit from the longer 10s duration. "
"This would give the generation more time to smoothly transition between the two inputs. "
"Before diving in, review these best practices to ensure that your input selections "
"will set your generation up for success: "
"https://help.runwayml.com/hc/en-us/articles/34170748696595-Creating-with-Keyframes-on-Gen-3.",
inputs=[
comfy_io.String.Input(
"prompt",
multiline=True,
default="",
tooltip="Text prompt for the generation",
def INPUT_TYPES(s):
return {
"required": {
"prompt": model_field_to_node_input(
IO.STRING, RunwayImageToVideoRequest, "promptText", multiline=True
),
comfy_io.Image.Input(
"start_frame",
tooltip="Start frame to be used for the video",
"start_frame": (
IO.IMAGE,
{"tooltip": "Start frame to be used for the video"},
),
comfy_io.Image.Input(
"end_frame",
tooltip="End frame to be used for the video. Supported for gen3a_turbo only.",
"end_frame": (
IO.IMAGE,
{
"tooltip": "End frame to be used for the video. Supported for gen3a_turbo only."
},
),
comfy_io.Combo.Input(
"duration",
options=[model.value for model in Duration],
"duration": model_field_to_node_input(
IO.COMBO, RunwayImageToVideoRequest, "duration", enum_type=Duration
),
comfy_io.Combo.Input(
"ratio": model_field_to_node_input(
IO.COMBO,
RunwayImageToVideoRequest,
"ratio",
options=[model.value for model in RunwayGen3aAspectRatio],
enum_type=RunwayGen3aAspectRatio,
),
comfy_io.Int.Input(
"seed": model_field_to_node_input(
IO.INT,
RunwayImageToVideoRequest,
"seed",
default=0,
min=0,
max=4294967295,
step=1,
control_after_generate=True,
display_mode=comfy_io.NumberDisplay.number,
tooltip="Random seed for generation",
),
],
outputs=[
comfy_io.Video.Output(),
],
hidden=[
comfy_io.Hidden.auth_token_comfy_org,
comfy_io.Hidden.api_key_comfy_org,
comfy_io.Hidden.unique_id,
],
is_api_node=True,
)
},
"hidden": {
"auth_token": "AUTH_TOKEN_COMFY_ORG",
"unique_id": "UNIQUE_ID",
"comfy_api_key": "API_KEY_COMFY_ORG",
},
}
@classmethod
async def execute(
cls,
async def api_call(
self,
prompt: str,
start_frame: torch.Tensor,
end_frame: torch.Tensor,
duration: str,
ratio: str,
seed: int,
) -> comfy_io.NodeOutput:
unique_id: Optional[str] = None,
**kwargs,
) -> tuple[VideoFromFile]:
# Validate inputs
validate_string(prompt, min_length=1)
validate_image_dimensions(start_frame, max_width=7999, max_height=7999)
validate_image_dimensions(end_frame, max_width=7999, max_height=7999)
validate_image_aspect_ratio(start_frame, min_aspect_ratio=0.5, max_aspect_ratio=2.0)
validate_image_aspect_ratio(end_frame, min_aspect_ratio=0.5, max_aspect_ratio=2.0)
auth_kwargs = {
"auth_token": cls.hidden.auth_token_comfy_org,
"comfy_api_key": cls.hidden.api_key_comfy_org,
}
validate_input_image(start_frame)
validate_input_image(end_frame)
# Upload images
stacked_input_images = image_tensor_pair_to_batch(start_frame, end_frame)
download_urls = await upload_images_to_comfyapi(
stacked_input_images,
max_images=2,
mime_type="image/png",
auth_kwargs=auth_kwargs,
auth_kwargs=kwargs,
)
if len(download_urls) != 2:
raise RunwayApiError("Failed to upload one or more images to comfy api.")
return comfy_io.NodeOutput(
await generate_video(
RunwayImageToVideoRequest(
promptText=prompt,
seed=seed,
model=Model("gen3a_turbo"),
duration=Duration(duration),
ratio=AspectRatio(ratio),
promptImage=RunwayPromptImageObject(
root=[
RunwayPromptImageDetailedObject(
uri=str(download_urls[0]), position="first"
),
RunwayPromptImageDetailedObject(
uri=str(download_urls[1]), position="last"
),
]
),
return await self.generate_video(
RunwayImageToVideoRequest(
promptText=prompt,
seed=seed,
model=Model("gen3a_turbo"),
duration=Duration(duration),
ratio=AspectRatio(ratio),
promptImage=RunwayPromptImageObject(
root=[
RunwayPromptImageDetailedObject(
uri=str(download_urls[0]), position="first"
),
RunwayPromptImageDetailedObject(
uri=str(download_urls[1]), position="last"
),
]
),
auth_kwargs=auth_kwargs,
node_id=cls.hidden.unique_id,
estimated_duration=AVERAGE_DURATION_FLF_SECONDS,
)
),
auth_kwargs=kwargs,
node_id=unique_id,
)
class RunwayTextToImageNode(comfy_io.ComfyNode):
class RunwayTextToImageNode(ComfyNodeABC):
"""Runway Text to Image Node."""
RETURN_TYPES = ("IMAGE",)
FUNCTION = "api_call"
CATEGORY = "api node/image/Runway"
API_NODE = True
DESCRIPTION = "Generate an image from a text prompt using Runway's Gen 4 model. You can also include reference images to guide the generation."
@classmethod
def define_schema(cls):
return comfy_io.Schema(
node_id="RunwayTextToImageNode",
display_name="Runway Text to Image",
category="api node/image/Runway",
description="Generate an image from a text prompt using Runway's Gen 4 model. "
"You can also include reference image to guide the generation.",
inputs=[
comfy_io.String.Input(
"prompt",
multiline=True,
default="",
tooltip="Text prompt for the generation",
def INPUT_TYPES(s):
return {
"required": {
"prompt": model_field_to_node_input(
IO.STRING, RunwayTextToImageRequest, "promptText", multiline=True
),
comfy_io.Combo.Input(
"ratio": model_field_to_node_input(
IO.COMBO,
RunwayTextToImageRequest,
"ratio",
options=[model.value for model in RunwayTextToImageAspectRatioEnum],
enum_type=RunwayTextToImageAspectRatioEnum,
),
comfy_io.Image.Input(
"reference_image",
tooltip="Optional reference image to guide the generation",
optional=True,
),
],
outputs=[
comfy_io.Image.Output(),
],
hidden=[
comfy_io.Hidden.auth_token_comfy_org,
comfy_io.Hidden.api_key_comfy_org,
comfy_io.Hidden.unique_id,
],
is_api_node=True,
},
"optional": {
"reference_image": (
IO.IMAGE,
{"tooltip": "Optional reference image to guide the generation"},
)
},
"hidden": {
"auth_token": "AUTH_TOKEN_COMFY_ORG",
"comfy_api_key": "API_KEY_COMFY_ORG",
"unique_id": "UNIQUE_ID",
},
}
def validate_task_created(self, response: RunwayTextToImageResponse) -> bool:
"""
Validate the task creation response from the Runway API matches
expected format.
"""
if not bool(response.id):
raise RunwayApiError("Invalid initial response from Runway API.")
return True
def validate_response(self, response: TaskStatusResponse) -> bool:
"""
Validate the successful task status response from the Runway API
matches expected format.
"""
if not response.output or len(response.output) == 0:
raise RunwayApiError(
"Runway task succeeded but no image data found in response."
)
return True
async def get_response(
self, task_id: str, auth_kwargs: dict[str, str], node_id: Optional[str] = None
) -> TaskStatusResponse:
"""Poll the task status until it is finished then get the response."""
return await poll_until_finished(
auth_kwargs,
ApiEndpoint(
path=f"{PATH_GET_TASK_STATUS}/{task_id}",
method=HttpMethod.GET,
request_model=EmptyRequest,
response_model=TaskStatusResponse,
),
estimated_duration=AVERAGE_DURATION_T2I_SECONDS,
node_id=node_id,
)
@classmethod
async def execute(
cls,
async def api_call(
self,
prompt: str,
ratio: str,
reference_image: Optional[torch.Tensor] = None,
) -> comfy_io.NodeOutput:
unique_id: Optional[str] = None,
**kwargs,
) -> tuple[torch.Tensor]:
# Validate inputs
validate_string(prompt, min_length=1)
auth_kwargs = {
"auth_token": cls.hidden.auth_token_comfy_org,
"comfy_api_key": cls.hidden.api_key_comfy_org,
}
# Prepare reference images if provided
reference_images = None
if reference_image is not None:
validate_image_dimensions(reference_image, max_width=7999, max_height=7999)
validate_image_aspect_ratio(reference_image, min_aspect_ratio=0.5, max_aspect_ratio=2.0)
validate_input_image(reference_image)
download_urls = await upload_images_to_comfyapi(
reference_image,
max_images=1,
mime_type="image/png",
auth_kwargs=auth_kwargs,
auth_kwargs=kwargs,
)
if len(download_urls) != 1:
raise RunwayApiError("Failed to upload reference image to comfy api.")
reference_images = [ReferenceImage(uri=str(download_urls[0]))]
# Create request
request = RunwayTextToImageRequest(
promptText=prompt,
model=Model4.gen4_image,
@@ -565,6 +593,7 @@ class RunwayTextToImageNode(comfy_io.ComfyNode):
referenceImages=reference_images,
)
# Execute initial request
initial_operation = SynchronousOperation(
endpoint=ApiEndpoint(
path=PATH_TEXT_TO_IMAGE,
@@ -573,33 +602,34 @@ class RunwayTextToImageNode(comfy_io.ComfyNode):
response_model=RunwayTextToImageResponse,
),
request=request,
auth_kwargs=auth_kwargs,
auth_kwargs=kwargs,
)
initial_response = await initial_operation.execute()
self.validate_task_created(initial_response)
task_id = initial_response.id
# Poll for completion
final_response = await get_response(
initial_response.id,
auth_kwargs=auth_kwargs,
node_id=cls.hidden.unique_id,
estimated_duration=AVERAGE_DURATION_T2I_SECONDS,
final_response = await self.get_response(
task_id, auth_kwargs=kwargs, node_id=unique_id
)
if not final_response.output:
raise RunwayApiError("Runway task succeeded but no image data found in response.")
self.validate_response(final_response)
return comfy_io.NodeOutput(await download_url_to_image_tensor(get_image_url_from_task_status(final_response)))
# Download and return image
image_url = get_image_url_from_task_status(final_response)
return (await download_url_to_image_tensor(image_url),)
class RunwayExtension(ComfyExtension):
@override
async def get_node_list(self) -> list[type[comfy_io.ComfyNode]]:
return [
RunwayFirstLastFrameNode,
RunwayImageToVideoNodeGen3a,
RunwayImageToVideoNodeGen4,
RunwayTextToImageNode,
]
NODE_CLASS_MAPPINGS = {
"RunwayFirstLastFrameNode": RunwayFirstLastFrameNode,
"RunwayImageToVideoNodeGen3a": RunwayImageToVideoNodeGen3a,
"RunwayImageToVideoNodeGen4": RunwayImageToVideoNodeGen4,
"RunwayTextToImageNode": RunwayTextToImageNode,
}
async def comfy_entrypoint() -> RunwayExtension:
return RunwayExtension()
NODE_DISPLAY_NAME_MAPPINGS = {
"RunwayFirstLastFrameNode": "Runway First-Last-Frame to Video",
"RunwayImageToVideoNodeGen3a": "Runway Image to Video (Gen3a Turbo)",
"RunwayImageToVideoNodeGen4": "Runway Image to Video (Gen4 Turbo)",
"RunwayTextToImageNode": "Runway Text to Image",
}

View File

@@ -181,8 +181,9 @@ class WebUIProgressHandler(ProgressHandler):
}
# Send a combined progress_state message with all node states
# Include client_id to ensure message is only sent to the initiating client
self.server_instance.send_sync(
"progress_state", {"prompt_id": prompt_id, "nodes": active_nodes}
"progress_state", {"prompt_id": prompt_id, "nodes": active_nodes}, self.server_instance.client_id
)
@override

View File

@@ -1,10 +1,6 @@
#from: https://research.nvidia.com/labs/toronto-ai/AlignYourSteps/howto.html
import numpy as np
import torch
from typing_extensions import override
from comfy_api.latest import ComfyExtension, io
def loglinear_interp(t_steps, num_steps):
"""
@@ -23,30 +19,25 @@ NOISE_LEVELS = {"SD1": [14.6146412293, 6.4745760956, 3.8636745985, 2.694615152
"SDXL":[14.6146412293, 6.3184485287, 3.7681790315, 2.1811480769, 1.3405244945, 0.8620721141, 0.5550693289, 0.3798540708, 0.2332364134, 0.1114188177, 0.0291671582],
"SVD": [700.00, 54.5, 15.886, 7.977, 4.248, 1.789, 0.981, 0.403, 0.173, 0.034, 0.002]}
class AlignYourStepsScheduler(io.ComfyNode):
class AlignYourStepsScheduler:
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="AlignYourStepsScheduler",
category="sampling/custom_sampling/schedulers",
inputs=[
io.Combo.Input("model_type", options=["SD1", "SDXL", "SVD"]),
io.Int.Input("steps", default=10, min=1, max=10000),
io.Float.Input("denoise", default=1.0, min=0.0, max=1.0, step=0.01),
],
outputs=[io.Sigmas.Output()],
)
def INPUT_TYPES(s):
return {"required":
{"model_type": (["SD1", "SDXL", "SVD"], ),
"steps": ("INT", {"default": 10, "min": 1, "max": 10000}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
}
}
RETURN_TYPES = ("SIGMAS",)
CATEGORY = "sampling/custom_sampling/schedulers"
FUNCTION = "get_sigmas"
def get_sigmas(self, model_type, steps, denoise):
# Deprecated: use the V3 schema's `execute` method instead of this.
return AlignYourStepsScheduler().execute(model_type, steps, denoise).result
@classmethod
def execute(cls, model_type, steps, denoise) -> io.NodeOutput:
total_steps = steps
if denoise < 1.0:
if denoise <= 0.0:
return io.NodeOutput(torch.FloatTensor([]))
return (torch.FloatTensor([]),)
total_steps = round(steps * denoise)
sigmas = NOISE_LEVELS[model_type][:]
@@ -55,15 +46,8 @@ class AlignYourStepsScheduler(io.ComfyNode):
sigmas = sigmas[-(total_steps + 1):]
sigmas[-1] = 0
return io.NodeOutput(torch.FloatTensor(sigmas))
return (torch.FloatTensor(sigmas), )
class AlignYourStepsExtension(ComfyExtension):
@override
async def get_node_list(self) -> list[type[io.ComfyNode]]:
return [
AlignYourStepsScheduler,
]
async def comfy_entrypoint() -> AlignYourStepsExtension:
return AlignYourStepsExtension()
NODE_CLASS_MAPPINGS = {
"AlignYourStepsScheduler": AlignYourStepsScheduler,
}

View File

@@ -105,7 +105,7 @@ class FluxKontextMultiReferenceLatentMethod:
def INPUT_TYPES(s):
return {"required": {
"conditioning": ("CONDITIONING", ),
"reference_latents_method": (("offset", "index", "uxo/uno"), ),
"reference_latents_method": (("offset", "index", "uso"), ),
}}
RETURN_TYPES = ("CONDITIONING",)
@@ -115,8 +115,6 @@ class FluxKontextMultiReferenceLatentMethod:
CATEGORY = "advanced/conditioning/flux"
def append(self, conditioning, reference_latents_method):
if "uxo" in reference_latents_method or "uso" in reference_latents_method:
reference_latents_method = "uxo"
c = node_helpers.conditioning_set_values(conditioning, {"reference_latents_method": reference_latents_method})
return (c, )

View File

@@ -625,37 +625,6 @@ class ImageFlip:
return (image,)
class ImageScaleToMaxDimension:
upscale_methods = ["area", "lanczos", "bilinear", "nearest-exact", "bilinear", "bicubic"]
@classmethod
def INPUT_TYPES(s):
return {"required": {"image": ("IMAGE",),
"upscale_method": (s.upscale_methods,),
"largest_size": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1})}}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "upscale"
CATEGORY = "image/upscaling"
def upscale(self, image, upscale_method, largest_size):
height = image.shape[1]
width = image.shape[2]
if height > width:
width = round((width / height) * largest_size)
height = largest_size
elif width > height:
height = round((height / width) * largest_size)
width = largest_size
else:
height = largest_size
width = largest_size
samples = image.movedim(-1, 1)
s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled")
s = s.movedim(1, -1)
return (s,)
NODE_CLASS_MAPPINGS = {
"ImageCrop": ImageCrop,
@@ -670,5 +639,4 @@ NODE_CLASS_MAPPINGS = {
"GetImageSize": GetImageSize,
"ImageRotate": ImageRotate,
"ImageFlip": ImageFlip,
"ImageScaleToMaxDimension": ImageScaleToMaxDimension,
}

View File

@@ -1,5 +1,4 @@
import torch
from torch import nn
import folder_paths
import comfy.utils
import comfy.ops
@@ -59,136 +58,6 @@ class QwenImageBlockWiseControlNet(torch.nn.Module):
return self.controlnet_blocks[block_id](img, controlnet_conditioning)
class SigLIPMultiFeatProjModel(torch.nn.Module):
"""
SigLIP Multi-Feature Projection Model for processing style features from different layers
and projecting them into a unified hidden space.
Args:
siglip_token_nums (int): Number of SigLIP tokens, default 257
style_token_nums (int): Number of style tokens, default 256
siglip_token_dims (int): Dimension of SigLIP tokens, default 1536
hidden_size (int): Hidden layer size, default 3072
context_layer_norm (bool): Whether to use context layer normalization, default False
"""
def __init__(
self,
siglip_token_nums: int = 729,
style_token_nums: int = 64,
siglip_token_dims: int = 1152,
hidden_size: int = 3072,
context_layer_norm: bool = True,
device=None, dtype=None, operations=None
):
super().__init__()
# High-level feature processing (layer -2)
self.high_embedding_linear = nn.Sequential(
operations.Linear(siglip_token_nums, style_token_nums),
nn.SiLU()
)
self.high_layer_norm = (
operations.LayerNorm(siglip_token_dims) if context_layer_norm else nn.Identity()
)
self.high_projection = operations.Linear(siglip_token_dims, hidden_size, bias=True)
# Mid-level feature processing (layer -11)
self.mid_embedding_linear = nn.Sequential(
operations.Linear(siglip_token_nums, style_token_nums),
nn.SiLU()
)
self.mid_layer_norm = (
operations.LayerNorm(siglip_token_dims) if context_layer_norm else nn.Identity()
)
self.mid_projection = operations.Linear(siglip_token_dims, hidden_size, bias=True)
# Low-level feature processing (layer -20)
self.low_embedding_linear = nn.Sequential(
operations.Linear(siglip_token_nums, style_token_nums),
nn.SiLU()
)
self.low_layer_norm = (
operations.LayerNorm(siglip_token_dims) if context_layer_norm else nn.Identity()
)
self.low_projection = operations.Linear(siglip_token_dims, hidden_size, bias=True)
def forward(self, siglip_outputs):
"""
Forward pass function
Args:
siglip_outputs: Output from SigLIP model, containing hidden_states
Returns:
torch.Tensor: Concatenated multi-layer features with shape [bs, 3*style_token_nums, hidden_size]
"""
dtype = next(self.high_embedding_linear.parameters()).dtype
# Process high-level features (layer -2)
high_embedding = self._process_layer_features(
siglip_outputs[2],
self.high_embedding_linear,
self.high_layer_norm,
self.high_projection,
dtype
)
# Process mid-level features (layer -11)
mid_embedding = self._process_layer_features(
siglip_outputs[1],
self.mid_embedding_linear,
self.mid_layer_norm,
self.mid_projection,
dtype
)
# Process low-level features (layer -20)
low_embedding = self._process_layer_features(
siglip_outputs[0],
self.low_embedding_linear,
self.low_layer_norm,
self.low_projection,
dtype
)
# Concatenate features from all layersmodel_patch
return torch.cat((high_embedding, mid_embedding, low_embedding), dim=1)
def _process_layer_features(
self,
hidden_states: torch.Tensor,
embedding_linear: nn.Module,
layer_norm: nn.Module,
projection: nn.Module,
dtype: torch.dtype
) -> torch.Tensor:
"""
Helper function to process features from a single layer
Args:
hidden_states: Input hidden states [bs, seq_len, dim]
embedding_linear: Embedding linear layer
layer_norm: Layer normalization
projection: Projection layer
dtype: Target data type
Returns:
torch.Tensor: Processed features [bs, style_token_nums, hidden_size]
"""
# Transform dimensions: [bs, seq_len, dim] -> [bs, dim, seq_len] -> [bs, dim, style_token_nums] -> [bs, style_token_nums, dim]
embedding = embedding_linear(
hidden_states.to(dtype).transpose(1, 2)
).transpose(1, 2)
# Apply layer normalization
embedding = layer_norm(embedding)
# Project to target hidden space
embedding = projection(embedding)
return embedding
class ModelPatchLoader:
@classmethod
def INPUT_TYPES(s):
@@ -204,14 +73,9 @@ class ModelPatchLoader:
model_patch_path = folder_paths.get_full_path_or_raise("model_patches", name)
sd = comfy.utils.load_torch_file(model_patch_path, safe_load=True)
dtype = comfy.utils.weight_dtype(sd)
if 'controlnet_blocks.0.y_rms.weight' in sd:
additional_in_dim = sd["img_in.weight"].shape[1] - 64
model = QwenImageBlockWiseControlNet(additional_in_dim=additional_in_dim, device=comfy.model_management.unet_offload_device(), dtype=dtype, operations=comfy.ops.manual_cast)
elif 'feature_embedder.mid_layer_norm.bias' in sd:
sd = comfy.utils.state_dict_prefix_replace(sd, {"feature_embedder.": ""}, filter_keys=True)
model = SigLIPMultiFeatProjModel(device=comfy.model_management.unet_offload_device(), dtype=dtype, operations=comfy.ops.manual_cast)
# TODO: this node will work with more types of model patches
additional_in_dim = sd["img_in.weight"].shape[1] - 64
model = QwenImageBlockWiseControlNet(additional_in_dim=additional_in_dim, device=comfy.model_management.unet_offload_device(), dtype=dtype, operations=comfy.ops.manual_cast)
model.load_state_dict(sd)
model = comfy.model_patcher.ModelPatcher(model, load_device=comfy.model_management.get_torch_device(), offload_device=comfy.model_management.unet_offload_device())
return (model,)
@@ -293,51 +157,7 @@ class QwenImageDiffsynthControlnet:
return (model_patched,)
class UsoStyleProjectorPatch:
def __init__(self, model_patch, encoded_image):
self.model_patch = model_patch
self.encoded_image = encoded_image
def __call__(self, kwargs):
txt_ids = kwargs.get("txt_ids")
txt = kwargs.get("txt")
siglip_embedding = self.model_patch.model(self.encoded_image.to(txt.dtype)).to(txt.dtype)
txt = torch.cat([siglip_embedding, txt], dim=1)
kwargs['txt'] = txt
kwargs['txt_ids'] = torch.cat([torch.zeros(siglip_embedding.shape[0], siglip_embedding.shape[1], 3, dtype=txt_ids.dtype, device=txt_ids.device), txt_ids], dim=1)
return kwargs
def to(self, device_or_dtype):
if isinstance(device_or_dtype, torch.device):
self.encoded_image = self.encoded_image.to(device_or_dtype)
return self
def models(self):
return [self.model_patch]
class USOStyleReference:
@classmethod
def INPUT_TYPES(s):
return {"required": {"model": ("MODEL",),
"model_patch": ("MODEL_PATCH",),
"clip_vision_output": ("CLIP_VISION_OUTPUT", ),
}}
RETURN_TYPES = ("MODEL",)
FUNCTION = "apply_patch"
EXPERIMENTAL = True
CATEGORY = "advanced/model_patches/flux"
def apply_patch(self, model, model_patch, clip_vision_output):
encoded_image = torch.stack((clip_vision_output.all_hidden_states[:, -20], clip_vision_output.all_hidden_states[:, -11], clip_vision_output.penultimate_hidden_states))
model_patched = model.clone()
model_patched.set_model_post_input_patch(UsoStyleProjectorPatch(model_patch, encoded_image))
return (model_patched,)
NODE_CLASS_MAPPINGS = {
"ModelPatchLoader": ModelPatchLoader,
"QwenImageDiffsynthControlnet": QwenImageDiffsynthControlnet,
"USOStyleReference": USOStyleReference,
}

View File

@@ -1,3 +1,3 @@
# This file is automatically generated by the build process when version is
# updated in pyproject.toml.
__version__ = "0.3.57"
__version__ = "0.3.56"

View File

@@ -2344,7 +2344,6 @@ async def init_builtin_api_nodes():
"nodes_veo2.py",
"nodes_kling.py",
"nodes_bfl.py",
"nodes_bytedance.py",
"nodes_luma.py",
"nodes_recraft.py",
"nodes_pixverse.py",

View File

@@ -1,6 +1,6 @@
[project]
name = "ComfyUI"
version = "0.3.57"
version = "0.3.56"
readme = "README.md"
license = { file = "LICENSE" }
requires-python = ">=3.9"

View File

@@ -1,5 +1,5 @@
comfyui-frontend-package==1.25.11
comfyui-workflow-templates==0.1.75
comfyui-workflow-templates==0.1.70
comfyui-embedded-docs==0.2.6
torch
torchsde

View File

@@ -3,7 +3,11 @@ from urllib import request
#This is the ComfyUI api prompt format.
#If you want it for a specific workflow you can "File -> Export (API)" in the interface.
#If you want it for a specific workflow you can "enable dev mode options"
#in the settings of the UI (gear beside the "Queue Size: ") this will enable
#a button on the UI to save workflows in api format.
#keep in mind ComfyUI is pre alpha software so this format will change a bit.
#this is the one for the default workflow
prompt_text = """

View File

@@ -729,34 +729,7 @@ class PromptServer():
@routes.post("/interrupt")
async def post_interrupt(request):
try:
json_data = await request.json()
except json.JSONDecodeError:
json_data = {}
# Check if a specific prompt_id was provided for targeted interruption
prompt_id = json_data.get('prompt_id')
if prompt_id:
currently_running, _ = self.prompt_queue.get_current_queue()
# Check if the prompt_id matches any currently running prompt
should_interrupt = False
for item in currently_running:
# item structure: (number, prompt_id, prompt, extra_data, outputs_to_execute)
if item[1] == prompt_id:
logging.info(f"Interrupting prompt {prompt_id}")
should_interrupt = True
break
if should_interrupt:
nodes.interrupt_processing()
else:
logging.info(f"Prompt {prompt_id} is not currently running, skipping interrupt")
else:
# No prompt_id provided, do a global interrupt
logging.info("Global interrupt (no prompt_id specified)")
nodes.interrupt_processing()
nodes.interrupt_processing()
return web.Response(status=200)
@routes.post("/free")

View File

@@ -6,6 +6,7 @@ def pytest_addoption(parser):
parser.addoption('--output_dir', action="store", default='tests/inference/samples', help='Output directory for generated images')
parser.addoption("--listen", type=str, default="127.0.0.1", metavar="IP", nargs="?", const="0.0.0.0", help="Specify the IP address to listen on (default: 127.0.0.1). If --listen is provided without an argument, it defaults to 0.0.0.0. (listens on all)")
parser.addoption("--port", type=int, default=8188, help="Set the listen port.")
parser.addoption("--skip-timing-checks", action="store_true", default=False, help="Skip timing-related assertions in tests (useful for CI environments with variable performance)")
# This initializes args at the beginning of the test session
@pytest.fixture(scope="session", autouse=True)
@@ -19,6 +20,11 @@ def args_pytest(pytestconfig):
return args
@pytest.fixture(scope="session")
def skip_timing_checks(pytestconfig):
"""Fixture that returns whether timing checks should be skipped."""
return pytestconfig.getoption("--skip-timing-checks")
def pytest_collection_modifyitems(items):
# Modifies items so tests run in the correct order

View File

@@ -7,7 +7,7 @@ import subprocess
from pytest import fixture
from comfy_execution.graph_utils import GraphBuilder
from tests.inference.test_execution import ComfyClient, run_warmup
from tests.execution.test_execution import ComfyClient, run_warmup
@pytest.mark.execution
@@ -23,7 +23,7 @@ class TestAsyncNodes:
'--output-directory', args_pytest["output_dir"],
'--listen', args_pytest["listen"],
'--port', str(args_pytest["port"]),
'--extra-model-paths-config', 'tests/inference/extra_model_paths.yaml',
'--extra-model-paths-config', 'tests/execution/extra_model_paths.yaml',
'--cpu',
]
use_lru, lru_size = request.param
@@ -81,7 +81,7 @@ class TestAsyncNodes:
assert len(result_images) == 1, "Should have 1 image"
assert np.array(result_images[0]).min() == 0 and np.array(result_images[0]).max() == 0, "Image should be black"
def test_multiple_async_parallel_execution(self, client: ComfyClient, builder: GraphBuilder):
def test_multiple_async_parallel_execution(self, client: ComfyClient, builder: GraphBuilder, skip_timing_checks):
"""Test that multiple async nodes execute in parallel."""
# Warmup execution to ensure server is fully initialized
run_warmup(client)
@@ -104,7 +104,8 @@ class TestAsyncNodes:
elapsed_time = time.time() - start_time
# Should take ~0.5s (max duration) not 1.2s (sum of durations)
assert elapsed_time < 0.8, f"Parallel execution took {elapsed_time}s, expected < 0.8s"
if not skip_timing_checks:
assert elapsed_time < 0.8, f"Parallel execution took {elapsed_time}s, expected < 0.8s"
# Verify all nodes executed
assert result.did_run(sleep1) and result.did_run(sleep2) and result.did_run(sleep3)
@@ -150,7 +151,7 @@ class TestAsyncNodes:
with pytest.raises(urllib.error.HTTPError):
client.run(g)
def test_async_lazy_evaluation(self, client: ComfyClient, builder: GraphBuilder):
def test_async_lazy_evaluation(self, client: ComfyClient, builder: GraphBuilder, skip_timing_checks):
"""Test async nodes with lazy evaluation."""
# Warmup execution to ensure server is fully initialized
run_warmup(client, prefix="warmup_lazy")
@@ -173,7 +174,8 @@ class TestAsyncNodes:
elapsed_time = time.time() - start_time
# Should only execute sleep1, not sleep2
assert elapsed_time < 0.5, f"Should skip sleep2, took {elapsed_time}s"
if not skip_timing_checks:
assert elapsed_time < 0.5, f"Should skip sleep2, took {elapsed_time}s"
assert result.did_run(sleep1), "Sleep1 should have executed"
assert not result.did_run(sleep2), "Sleep2 should have been skipped"
@@ -310,7 +312,7 @@ class TestAsyncNodes:
images = result.get_images(output)
assert len(images) == 1, "Should have blocked second image"
def test_async_caching_behavior(self, client: ComfyClient, builder: GraphBuilder):
def test_async_caching_behavior(self, client: ComfyClient, builder: GraphBuilder, skip_timing_checks):
"""Test that async nodes are properly cached."""
# Warmup execution to ensure server is fully initialized
run_warmup(client, prefix="warmup_cache")
@@ -330,9 +332,10 @@ class TestAsyncNodes:
elapsed_time = time.time() - start_time
assert not result2.did_run(sleep_node), "Should be cached"
assert elapsed_time < 0.1, f"Cached run took {elapsed_time}s, should be instant"
if not skip_timing_checks:
assert elapsed_time < 0.1, f"Cached run took {elapsed_time}s, should be instant"
def test_async_with_dynamic_prompts(self, client: ComfyClient, builder: GraphBuilder):
def test_async_with_dynamic_prompts(self, client: ComfyClient, builder: GraphBuilder, skip_timing_checks):
"""Test async nodes within dynamically generated prompts."""
# Warmup execution to ensure server is fully initialized
run_warmup(client, prefix="warmup_dynamic")
@@ -345,8 +348,8 @@ class TestAsyncNodes:
dynamic_async = g.node("TestDynamicAsyncGeneration",
image1=image1.out(0),
image2=image2.out(0),
num_async_nodes=3,
sleep_duration=0.2)
num_async_nodes=5,
sleep_duration=0.4)
g.node("SaveImage", images=dynamic_async.out(0))
start_time = time.time()
@@ -354,7 +357,8 @@ class TestAsyncNodes:
elapsed_time = time.time() - start_time
# Should execute async nodes in parallel within dynamic prompt
assert elapsed_time < 0.5, f"Dynamic async execution took {elapsed_time}s"
if not skip_timing_checks:
assert elapsed_time < 1.0, f"Dynamic async execution took {elapsed_time}s"
assert result.did_run(dynamic_async)
def test_async_resource_cleanup(self, client: ComfyClient, builder: GraphBuilder):

View File

@@ -149,7 +149,7 @@ class TestExecution:
'--output-directory', args_pytest["output_dir"],
'--listen', args_pytest["listen"],
'--port', str(args_pytest["port"]),
'--extra-model-paths-config', 'tests/inference/extra_model_paths.yaml',
'--extra-model-paths-config', 'tests/execution/extra_model_paths.yaml',
'--cpu',
]
use_lru, lru_size = request.param
@@ -518,7 +518,7 @@ class TestExecution:
assert numpy.array(images[0]).min() == 63 and numpy.array(images[0]).max() == 63, "Image should have value 0.25"
assert not result.did_run(test_node), "The execution should have been cached"
def test_parallel_sleep_nodes(self, client: ComfyClient, builder: GraphBuilder):
def test_parallel_sleep_nodes(self, client: ComfyClient, builder: GraphBuilder, skip_timing_checks):
# Warmup execution to ensure server is fully initialized
run_warmup(client)
@@ -541,14 +541,15 @@ class TestExecution:
# The test should take around 3.0 seconds (the longest sleep duration)
# plus some overhead, but definitely less than the sum of all sleeps (9.0s)
assert elapsed_time < 8.9, f"Parallel execution took {elapsed_time}s, expected less than 8.9s"
if not skip_timing_checks:
assert elapsed_time < 8.9, f"Parallel execution took {elapsed_time}s, expected less than 8.9s"
# Verify that all nodes executed
assert result.did_run(sleep_node1), "Sleep node 1 should have run"
assert result.did_run(sleep_node2), "Sleep node 2 should have run"
assert result.did_run(sleep_node3), "Sleep node 3 should have run"
def test_parallel_sleep_expansion(self, client: ComfyClient, builder: GraphBuilder):
def test_parallel_sleep_expansion(self, client: ComfyClient, builder: GraphBuilder, skip_timing_checks):
# Warmup execution to ensure server is fully initialized
run_warmup(client)
@@ -574,7 +575,9 @@ class TestExecution:
# Similar to the previous test, expect parallel execution of the sleep nodes
# which should complete in less than the sum of all sleeps
assert elapsed_time < 10.0, f"Expansion execution took {elapsed_time}s, expected less than 5.5s"
# Lots of leeway here since Windows CI is slow
if not skip_timing_checks:
assert elapsed_time < 13.0, f"Expansion execution took {elapsed_time}s"
# Verify the parallel sleep node executed
assert result.did_run(parallel_sleep), "ParallelSleep node should have run"

View File

@@ -0,0 +1,233 @@
"""Test that progress updates are properly isolated between WebSocket clients."""
import json
import pytest
import time
import threading
import uuid
import websocket
from typing import List, Dict, Any
from comfy_execution.graph_utils import GraphBuilder
from tests.execution.test_execution import ComfyClient
class ProgressTracker:
"""Tracks progress messages received by a WebSocket client."""
def __init__(self, client_id: str):
self.client_id = client_id
self.progress_messages: List[Dict[str, Any]] = []
self.lock = threading.Lock()
def add_message(self, message: Dict[str, Any]):
"""Thread-safe addition of progress messages."""
with self.lock:
self.progress_messages.append(message)
def get_messages_for_prompt(self, prompt_id: str) -> List[Dict[str, Any]]:
"""Get all progress messages for a specific prompt_id."""
with self.lock:
return [
msg for msg in self.progress_messages
if msg.get('data', {}).get('prompt_id') == prompt_id
]
def has_cross_contamination(self, own_prompt_id: str) -> bool:
"""Check if this client received progress for other prompts."""
with self.lock:
for msg in self.progress_messages:
msg_prompt_id = msg.get('data', {}).get('prompt_id')
if msg_prompt_id and msg_prompt_id != own_prompt_id:
return True
return False
class IsolatedClient(ComfyClient):
"""Extended ComfyClient that tracks all WebSocket messages."""
def __init__(self):
super().__init__()
self.progress_tracker = None
self.all_messages: List[Dict[str, Any]] = []
def connect(self, listen='127.0.0.1', port=8188, client_id=None):
"""Connect with a specific client_id and set up message tracking."""
if client_id is None:
client_id = str(uuid.uuid4())
super().connect(listen, port, client_id)
self.progress_tracker = ProgressTracker(client_id)
def listen_for_messages(self, duration: float = 5.0):
"""Listen for WebSocket messages for a specified duration."""
end_time = time.time() + duration
self.ws.settimeout(0.5) # Non-blocking with timeout
while time.time() < end_time:
try:
out = self.ws.recv()
if isinstance(out, str):
message = json.loads(out)
self.all_messages.append(message)
# Track progress_state messages
if message.get('type') == 'progress_state':
self.progress_tracker.add_message(message)
except websocket.WebSocketTimeoutException:
continue
except Exception:
# Log error silently in test context
break
@pytest.mark.execution
class TestProgressIsolation:
"""Test suite for verifying progress update isolation between clients."""
@pytest.fixture(scope="class", autouse=True)
def _server(self, args_pytest):
"""Start the ComfyUI server for testing."""
import subprocess
pargs = [
'python', 'main.py',
'--output-directory', args_pytest["output_dir"],
'--listen', args_pytest["listen"],
'--port', str(args_pytest["port"]),
'--extra-model-paths-config', 'tests/execution/extra_model_paths.yaml',
'--cpu',
]
p = subprocess.Popen(pargs)
yield
p.kill()
def start_client_with_retry(self, listen: str, port: int, client_id: str = None):
"""Start client with connection retries."""
client = IsolatedClient()
# Connect to server (with retries)
n_tries = 5
for i in range(n_tries):
time.sleep(4)
try:
client.connect(listen, port, client_id)
return client
except ConnectionRefusedError as e:
print(e) # noqa: T201
print(f"({i+1}/{n_tries}) Retrying...") # noqa: T201
raise ConnectionRefusedError(f"Failed to connect after {n_tries} attempts")
def test_progress_isolation_between_clients(self, args_pytest):
"""Test that progress updates are isolated between different clients."""
listen = args_pytest["listen"]
port = args_pytest["port"]
# Create two separate clients with unique IDs
client_a_id = "client_a_" + str(uuid.uuid4())
client_b_id = "client_b_" + str(uuid.uuid4())
try:
# Connect both clients with retries
client_a = self.start_client_with_retry(listen, port, client_a_id)
client_b = self.start_client_with_retry(listen, port, client_b_id)
# Create simple workflows for both clients
graph_a = GraphBuilder(prefix="client_a")
image_a = graph_a.node("StubImage", content="BLACK", height=256, width=256, batch_size=1)
graph_a.node("PreviewImage", images=image_a.out(0))
graph_b = GraphBuilder(prefix="client_b")
image_b = graph_b.node("StubImage", content="WHITE", height=256, width=256, batch_size=1)
graph_b.node("PreviewImage", images=image_b.out(0))
# Submit workflows from both clients
prompt_a = graph_a.finalize()
prompt_b = graph_b.finalize()
response_a = client_a.queue_prompt(prompt_a)
prompt_id_a = response_a['prompt_id']
response_b = client_b.queue_prompt(prompt_b)
prompt_id_b = response_b['prompt_id']
# Start threads to listen for messages on both clients
def listen_client_a():
client_a.listen_for_messages(duration=10.0)
def listen_client_b():
client_b.listen_for_messages(duration=10.0)
thread_a = threading.Thread(target=listen_client_a)
thread_b = threading.Thread(target=listen_client_b)
thread_a.start()
thread_b.start()
# Wait for threads to complete
thread_a.join()
thread_b.join()
# Verify isolation
# Client A should only receive progress for prompt_id_a
assert not client_a.progress_tracker.has_cross_contamination(prompt_id_a), \
f"Client A received progress updates for other clients' workflows. " \
f"Expected only {prompt_id_a}, but got messages for multiple prompts."
# Client B should only receive progress for prompt_id_b
assert not client_b.progress_tracker.has_cross_contamination(prompt_id_b), \
f"Client B received progress updates for other clients' workflows. " \
f"Expected only {prompt_id_b}, but got messages for multiple prompts."
# Verify each client received their own progress updates
client_a_messages = client_a.progress_tracker.get_messages_for_prompt(prompt_id_a)
client_b_messages = client_b.progress_tracker.get_messages_for_prompt(prompt_id_b)
assert len(client_a_messages) > 0, \
"Client A did not receive any progress updates for its own workflow"
assert len(client_b_messages) > 0, \
"Client B did not receive any progress updates for its own workflow"
# Ensure no cross-contamination
client_a_other = client_a.progress_tracker.get_messages_for_prompt(prompt_id_b)
client_b_other = client_b.progress_tracker.get_messages_for_prompt(prompt_id_a)
assert len(client_a_other) == 0, \
f"Client A incorrectly received {len(client_a_other)} progress updates for Client B's workflow"
assert len(client_b_other) == 0, \
f"Client B incorrectly received {len(client_b_other)} progress updates for Client A's workflow"
finally:
# Clean up connections
if hasattr(client_a, 'ws'):
client_a.ws.close()
if hasattr(client_b, 'ws'):
client_b.ws.close()
def test_progress_with_missing_client_id(self, args_pytest):
"""Test that progress updates handle missing client_id gracefully."""
listen = args_pytest["listen"]
port = args_pytest["port"]
try:
# Connect client with retries
client = self.start_client_with_retry(listen, port)
# Create a simple workflow
graph = GraphBuilder(prefix="test_missing_id")
image = graph.node("StubImage", content="BLACK", height=128, width=128, batch_size=1)
graph.node("PreviewImage", images=image.out(0))
# Submit workflow
prompt = graph.finalize()
response = client.queue_prompt(prompt)
prompt_id = response['prompt_id']
# Listen for messages
client.listen_for_messages(duration=5.0)
# Should still receive progress updates for own workflow
messages = client.progress_tracker.get_messages_for_prompt(prompt_id)
assert len(messages) > 0, \
"Client did not receive progress updates even though it initiated the workflow"
finally:
if hasattr(client, 'ws'):
client.ws.close()