from __future__ import annotations import torch import os import sys import json import glob import hashlib import inspect import traceback import math import time import random import logging from PIL import Image, ImageOps, ImageSequence from PIL.PngImagePlugin import PngInfo import numpy as np import safetensors.torch sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy")) import comfy.diffusers_load import comfy.samplers import comfy.sample import comfy.sd import comfy.utils import comfy.controlnet from comfy.comfy_types import IO, ComfyNodeABC, InputTypeDict, FileLocator from comfy_api.internal import register_versions, ComfyAPIWithVersion from comfy_api.version_list import supported_versions from comfy_api.latest import io, ComfyExtension import comfy.clip_vision import comfy.model_management from comfy.cli_args import args import importlib import folder_paths import latent_preview import node_helpers if args.enable_manager: import comfyui_manager def before_node_execution(): comfy.model_management.throw_exception_if_processing_interrupted() def interrupt_processing(value=True): comfy.model_management.interrupt_current_processing(value) MAX_RESOLUTION=16384 class CLIPTextEncode(ComfyNodeABC): @classmethod def INPUT_TYPES(s) -> InputTypeDict: return { "required": { "text": (IO.STRING, {"multiline": True, "dynamicPrompts": True, "tooltip": "The text to be encoded."}), "clip": (IO.CLIP, {"tooltip": "The CLIP model used for encoding the text."}) } } RETURN_TYPES = (IO.CONDITIONING,) OUTPUT_TOOLTIPS = ("A conditioning containing the embedded text used to guide the diffusion model.",) FUNCTION = "encode" CATEGORY = "conditioning" DESCRIPTION = "Encodes a text prompt using a CLIP model into an embedding that can be used to guide the diffusion model towards generating specific images." SHORT_DESCRIPTION = "Encodes text prompts using CLIP for guiding diffusion models." SEARCH_ALIASES = ["text", "prompt", "text prompt", "positive prompt", "negative prompt", "encode text", "text encoder", "encode prompt"] def encode(self, clip, text): if clip is None: raise RuntimeError("ERROR: clip input is invalid: None\n\nIf the clip is from a checkpoint loader node your checkpoint does not contain a valid clip or text encoder model.") tokens = clip.tokenize(text) return (clip.encode_from_tokens_scheduled(tokens), ) class ConditioningCombine: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "combine" CATEGORY = "conditioning" DESCRIPTION = "Combines two conditioning inputs into one by appending them together." SHORT_DESCRIPTION = None SEARCH_ALIASES = ["combine", "merge conditioning", "combine prompts", "merge prompts", "mix prompts", "add prompt"] def combine(self, conditioning_1, conditioning_2): return (conditioning_1 + conditioning_2, ) class ConditioningAverage : SEARCH_ALIASES = ["blend prompts", "interpolate conditioning", "mix prompts", "style fusion", "weighted blend"] @classmethod def INPUT_TYPES(s): return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ), "conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}) }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "addWeighted" CATEGORY = "conditioning" DESCRIPTION = "Blends two conditioning inputs using a weighted average, allowing smooth interpolation between prompts based on a strength parameter." SHORT_DESCRIPTION = "Blends two conditionings via weighted average interpolation." def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength): out = [] if len(conditioning_from) > 1: logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") cond_from = conditioning_from[0][0] pooled_output_from = conditioning_from[0][1].get("pooled_output", None) for i in range(len(conditioning_to)): t1 = conditioning_to[i][0] pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from) t0 = cond_from[:,:t1.shape[1]] if t0.shape[1] < t1.shape[1]: t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1) tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength)) t_to = conditioning_to[i][1].copy() if pooled_output_from is not None and pooled_output_to is not None: t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength)) elif pooled_output_from is not None: t_to["pooled_output"] = pooled_output_from n = [tw, t_to] out.append(n) return (out, ) class ConditioningConcat: @classmethod def INPUT_TYPES(s): return {"required": { "conditioning_to": ("CONDITIONING",), "conditioning_from": ("CONDITIONING",), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "concat" CATEGORY = "conditioning" DESCRIPTION = "Concatenates conditioning tokens from one conditioning onto another, extending the token sequence to combine their effects." SHORT_DESCRIPTION = "Concatenates conditioning tokens to combine prompt effects." def concat(self, conditioning_to, conditioning_from): out = [] if len(conditioning_from) > 1: logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") cond_from = conditioning_from[0][0] for i in range(len(conditioning_to)): t1 = conditioning_to[i][0] tw = torch.cat((t1, cond_from),1) n = [tw, conditioning_to[i][1].copy()] out.append(n) return (out, ) class ConditioningSetArea: SEARCH_ALIASES = ["regional prompt", "area prompt", "spatial conditioning", "localized prompt"] @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" DESCRIPTION = "Sets a rectangular area on conditioning using pixel coordinates, allowing the prompt to apply only to a specific region of the image." SHORT_DESCRIPTION = "Restricts conditioning to a specific pixel-coordinate area." def append(self, conditioning, width, height, x, y, strength): c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8), "strength": strength, "set_area_to_bounds": False}) return (c, ) class ConditioningSetAreaPercentage: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), "height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), "x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), "y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" DESCRIPTION = "Sets a rectangular area on conditioning using percentage-based coordinates, allowing the prompt to apply to a proportional region of the image." SHORT_DESCRIPTION = "Restricts conditioning to a percentage-based area." def append(self, conditioning, width, height, x, y, strength): c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x), "strength": strength, "set_area_to_bounds": False}) return (c, ) class ConditioningSetAreaStrength: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" DESCRIPTION = "Sets the strength of a conditioning area." SHORT_DESCRIPTION = None def append(self, conditioning, strength): c = node_helpers.conditioning_set_values(conditioning, {"strength": strength}) return (c, ) class ConditioningSetMask: SEARCH_ALIASES = ["masked prompt", "regional inpaint conditioning", "mask conditioning"] @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "mask": ("MASK", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), "set_cond_area": (["default", "mask bounds"],), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" DESCRIPTION = "Applies a mask to conditioning so the prompt only affects the masked region, with adjustable strength and optional bounds-based area restriction." SHORT_DESCRIPTION = "Applies a mask to limit conditioning to a region." def append(self, conditioning, mask, set_cond_area, strength): set_area_to_bounds = False if set_cond_area != "default": set_area_to_bounds = True if len(mask.shape) < 3: mask = mask.unsqueeze(0) c = node_helpers.conditioning_set_values(conditioning, {"mask": mask, "set_area_to_bounds": set_area_to_bounds, "mask_strength": strength}) return (c, ) class ConditioningZeroOut: SEARCH_ALIASES = ["null conditioning", "clear conditioning"] @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", )}} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "zero_out" CATEGORY = "advanced/conditioning" DESCRIPTION = "Zeros out all conditioning tensors including pooled output, producing an empty unconditional embedding." SHORT_DESCRIPTION = "Zeros out conditioning to produce an empty embedding." def zero_out(self, conditioning): c = [] for t in conditioning: d = t[1].copy() pooled_output = d.get("pooled_output", None) if pooled_output is not None: d["pooled_output"] = torch.zeros_like(pooled_output) conditioning_lyrics = d.get("conditioning_lyrics", None) if conditioning_lyrics is not None: d["conditioning_lyrics"] = torch.zeros_like(conditioning_lyrics) n = [torch.zeros_like(t[0]), d] c.append(n) return (c, ) class ConditioningSetTimestepRange: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), "end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "set_range" CATEGORY = "advanced/conditioning" DESCRIPTION = "Sets the start and end timestep percentages for conditioning, controlling which portion of the sampling process it is active during." SHORT_DESCRIPTION = "Limits conditioning to a specific timestep range." def set_range(self, conditioning, start, end): c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start, "end_percent": end}) return (c, ) class VAEDecode: @classmethod def INPUT_TYPES(s): return { "required": { "samples": ("LATENT", {"tooltip": "The latent to be decoded."}), "vae": ("VAE", {"tooltip": "The VAE model used for decoding the latent."}) } } RETURN_TYPES = ("IMAGE",) OUTPUT_TOOLTIPS = ("The decoded image.",) FUNCTION = "decode" CATEGORY = "latent" DESCRIPTION = "Decodes latent images back into pixel space images." SHORT_DESCRIPTION = None SEARCH_ALIASES = ["decode", "decode latent", "latent to image", "render latent"] def decode(self, vae, samples): latent = samples["samples"] if latent.is_nested: latent = latent.unbind()[0] images = vae.decode(latent) if len(images.shape) == 5: #Combine batches images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1]) return (images, ) class VAEDecodeTiled: @classmethod def INPUT_TYPES(s): return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ), "tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 32}), "overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}), "temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to decode at a time."}), "temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}), }} RETURN_TYPES = ("IMAGE",) FUNCTION = "decode" CATEGORY = "_for_testing" DESCRIPTION = "Decodes latent images to pixel space using tiling to reduce memory usage, with configurable tile size, overlap, and temporal settings for video VAEs." SHORT_DESCRIPTION = "Decodes latents to images using tiling for lower memory." def decode(self, vae, samples, tile_size, overlap=64, temporal_size=64, temporal_overlap=8): if tile_size < overlap * 4: overlap = tile_size // 4 if temporal_size < temporal_overlap * 2: temporal_overlap = temporal_overlap // 2 temporal_compression = vae.temporal_compression_decode() if temporal_compression is not None: temporal_size = max(2, temporal_size // temporal_compression) temporal_overlap = max(1, min(temporal_size // 2, temporal_overlap // temporal_compression)) else: temporal_size = None temporal_overlap = None compression = vae.spacial_compression_decode() images = vae.decode_tiled(samples["samples"], tile_x=tile_size // compression, tile_y=tile_size // compression, overlap=overlap // compression, tile_t=temporal_size, overlap_t=temporal_overlap) if len(images.shape) == 5: #Combine batches images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1]) return (images, ) class VAEEncode: @classmethod def INPUT_TYPES(s): return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}} RETURN_TYPES = ("LATENT",) FUNCTION = "encode" CATEGORY = "latent" DESCRIPTION = "Encodes pixel images into latent space using a VAE model." SHORT_DESCRIPTION = None SEARCH_ALIASES = ["encode", "encode image", "image to latent"] def encode(self, vae, pixels): t = vae.encode(pixels) return ({"samples":t}, ) class VAEEncodeTiled: @classmethod def INPUT_TYPES(s): return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ), "tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}), "overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}), "temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to encode at a time."}), "temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "encode" CATEGORY = "_for_testing" DESCRIPTION = "Encodes pixel images into latent space using tiling to reduce memory usage, with configurable tile size, overlap, and temporal settings for video VAEs." SHORT_DESCRIPTION = "Encodes images to latents using tiling for lower memory." def encode(self, vae, pixels, tile_size, overlap, temporal_size=64, temporal_overlap=8): t = vae.encode_tiled(pixels, tile_x=tile_size, tile_y=tile_size, overlap=overlap, tile_t=temporal_size, overlap_t=temporal_overlap) return ({"samples": t}, ) class VAEEncodeForInpaint: @classmethod def INPUT_TYPES(s): return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}} RETURN_TYPES = ("LATENT",) FUNCTION = "encode" CATEGORY = "latent/inpaint" DESCRIPTION = "Encodes an image into latent space for inpainting by applying and optionally growing a mask, zeroing out masked pixel regions before encoding." SHORT_DESCRIPTION = "Encodes images for inpainting with mask-aware encoding." def encode(self, vae, pixels, mask, grow_mask_by=6): downscale_ratio = vae.spacial_compression_encode() x = (pixels.shape[1] // downscale_ratio) * downscale_ratio y = (pixels.shape[2] // downscale_ratio) * downscale_ratio mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") pixels = pixels.clone() if pixels.shape[1] != x or pixels.shape[2] != y: x_offset = (pixels.shape[1] % downscale_ratio) // 2 y_offset = (pixels.shape[2] % downscale_ratio) // 2 pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] #grow mask by a few pixels to keep things seamless in latent space if grow_mask_by == 0: mask_erosion = mask else: kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by)) padding = math.ceil((grow_mask_by - 1) / 2) mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1) m = (1.0 - mask.round()).squeeze(1) for i in range(3): pixels[:,:,:,i] -= 0.5 pixels[:,:,:,i] *= m pixels[:,:,:,i] += 0.5 t = vae.encode(pixels) return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, ) class InpaintModelConditioning: @classmethod def INPUT_TYPES(s): return {"required": {"positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "vae": ("VAE", ), "pixels": ("IMAGE", ), "mask": ("MASK", ), "noise_mask": ("BOOLEAN", {"default": True, "tooltip": "Add a noise mask to the latent so sampling will only happen within the mask. Might improve results or completely break things depending on the model."}), }} RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT") RETURN_NAMES = ("positive", "negative", "latent") FUNCTION = "encode" CATEGORY = "conditioning/inpaint" DESCRIPTION = "Prepares conditioning for inpaint models by encoding the masked image and concatenating the latent and mask information into the positive and negative conditioning." SHORT_DESCRIPTION = "Prepares inpaint model conditioning with mask-aware latents." def encode(self, positive, negative, pixels, vae, mask, noise_mask=True): x = (pixels.shape[1] // 8) * 8 y = (pixels.shape[2] // 8) * 8 mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") orig_pixels = pixels pixels = orig_pixels.clone() if pixels.shape[1] != x or pixels.shape[2] != y: x_offset = (pixels.shape[1] % 8) // 2 y_offset = (pixels.shape[2] % 8) // 2 pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] m = (1.0 - mask.round()).squeeze(1) for i in range(3): pixels[:,:,:,i] -= 0.5 pixels[:,:,:,i] *= m pixels[:,:,:,i] += 0.5 concat_latent = vae.encode(pixels) orig_latent = vae.encode(orig_pixels) out_latent = {} out_latent["samples"] = orig_latent if noise_mask: out_latent["noise_mask"] = mask out = [] for conditioning in [positive, negative]: c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent, "concat_mask": mask}) out.append(c) return (out[0], out[1], out_latent) class SaveLatent: SEARCH_ALIASES = ["export latent"] def __init__(self): self.output_dir = folder_paths.get_output_directory() @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT", ), "filename_prefix": ("STRING", {"default": "latents/ComfyUI"})}, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, } RETURN_TYPES = () FUNCTION = "save" OUTPUT_NODE = True CATEGORY = "_for_testing" DESCRIPTION = "Saves latent tensors to a safetensors file in the output directory with optional workflow metadata." SHORT_DESCRIPTION = "Saves latent tensors to a safetensors file." def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir) # support save metadata for latent sharing prompt_info = "" if prompt is not None: prompt_info = json.dumps(prompt) metadata = None if not args.disable_metadata: metadata = {"prompt": prompt_info} if extra_pnginfo is not None: for x in extra_pnginfo: metadata[x] = json.dumps(extra_pnginfo[x]) file = f"{filename}_{counter:05}_.latent" results: list[FileLocator] = [] results.append({ "filename": file, "subfolder": subfolder, "type": "output" }) file = os.path.join(full_output_folder, file) output = {} output["latent_tensor"] = samples["samples"].contiguous() output["latent_format_version_0"] = torch.tensor([]) comfy.utils.save_torch_file(output, file, metadata=metadata) return { "ui": { "latents": results } } class LoadLatent: SEARCH_ALIASES = ["import latent", "open latent"] @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")] return {"required": {"latent": [sorted(files), ]}, } CATEGORY = "_for_testing" DESCRIPTION = "Loads latent tensors from a previously saved safetensors file in the input directory." SHORT_DESCRIPTION = "Loads latent tensors from a safetensors file." RETURN_TYPES = ("LATENT", ) FUNCTION = "load" def load(self, latent): latent_path = folder_paths.get_annotated_filepath(latent) latent = safetensors.torch.load_file(latent_path, device="cpu") multiplier = 1.0 if "latent_format_version_0" not in latent: multiplier = 1.0 / 0.18215 samples = {"samples": latent["latent_tensor"].float() * multiplier} return (samples, ) @classmethod def IS_CHANGED(s, latent): image_path = folder_paths.get_annotated_filepath(latent) m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) return m.digest().hex() @classmethod def VALIDATE_INPUTS(s, latent): if not folder_paths.exists_annotated_filepath(latent): return "Invalid latent file: {}".format(latent) return True class CheckpointLoader: SEARCH_ALIASES = ["load model", "model loader"] @classmethod def INPUT_TYPES(s): return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ), "ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}} RETURN_TYPES = ("MODEL", "CLIP", "VAE") FUNCTION = "load_checkpoint" CATEGORY = "advanced/loaders" DESCRIPTION = "Loads a checkpoint using a separate model config file. Deprecated in favor of CheckpointLoaderSimple which auto-detects the config." SHORT_DESCRIPTION = "Loads a checkpoint with a manual config file (deprecated)." DEPRECATED = True def load_checkpoint(self, config_name, ckpt_name): config_path = folder_paths.get_full_path("configs", config_name) ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) class CheckpointLoaderSimple: @classmethod def INPUT_TYPES(s): return { "required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), {"tooltip": "The name of the checkpoint (model) to load."}), } } RETURN_TYPES = ("MODEL", "CLIP", "VAE") OUTPUT_TOOLTIPS = ("The model used for denoising latents.", "The CLIP model used for encoding text prompts.", "The VAE model used for encoding and decoding images to and from latent space.") FUNCTION = "load_checkpoint" CATEGORY = "loaders" DESCRIPTION = "Loads a diffusion model checkpoint, diffusion models are used to denoise latents." SHORT_DESCRIPTION = "Loads a diffusion model checkpoint for denoising latents." SEARCH_ALIASES = ["load model", "checkpoint", "model loader", "load checkpoint", "ckpt", "model"] def load_checkpoint(self, ckpt_name): ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) return out[:3] class DiffusersLoader: SEARCH_ALIASES = ["load diffusers model"] @classmethod def INPUT_TYPES(cls): paths = [] for search_path in folder_paths.get_folder_paths("diffusers"): if os.path.exists(search_path): for root, subdir, files in os.walk(search_path, followlinks=True): if "model_index.json" in files: paths.append(os.path.relpath(root, start=search_path)) return {"required": {"model_path": (paths,), }} RETURN_TYPES = ("MODEL", "CLIP", "VAE") FUNCTION = "load_checkpoint" CATEGORY = "advanced/loaders/deprecated" DESCRIPTION = "Loads a diffusion model from the Hugging Face diffusers format, outputting the model, CLIP, and VAE components." SHORT_DESCRIPTION = "Loads diffusers-format models into model, CLIP, and VAE." def load_checkpoint(self, model_path, output_vae=True, output_clip=True): for search_path in folder_paths.get_folder_paths("diffusers"): if os.path.exists(search_path): path = os.path.join(search_path, model_path) if os.path.exists(path): model_path = path break return comfy.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings")) class unCLIPCheckpointLoader: @classmethod def INPUT_TYPES(s): return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ), }} RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION") FUNCTION = "load_checkpoint" CATEGORY = "loaders" DESCRIPTION = "Loads an unCLIP checkpoint, outputting the model, CLIP, VAE, and CLIP Vision components needed for image-guided generation." SHORT_DESCRIPTION = "Loads unCLIP checkpoints with CLIP Vision output." def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True): ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) return out class CLIPSetLastLayer: @classmethod def INPUT_TYPES(s): return {"required": { "clip": ("CLIP", ), "stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}), }} RETURN_TYPES = ("CLIP",) FUNCTION = "set_last_layer" CATEGORY = "conditioning" DESCRIPTION = "Sets which CLIP layer to use as the output. Earlier layers (more negative values) can produce different stylistic effects." SHORT_DESCRIPTION = "Sets which CLIP layer to use as output." def set_last_layer(self, clip, stop_at_clip_layer): clip = clip.clone() clip.clip_layer(stop_at_clip_layer) return (clip,) class LoraLoader: def __init__(self): self.loaded_lora = None @classmethod def INPUT_TYPES(s): return { "required": { "model": ("MODEL", {"tooltip": "The diffusion model the LoRA will be applied to."}), "clip": ("CLIP", {"tooltip": "The CLIP model the LoRA will be applied to."}), "lora_name": (folder_paths.get_filename_list("loras"), {"tooltip": "The name of the LoRA."}), "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the diffusion model. This value can be negative."}), "strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the CLIP model. This value can be negative."}), } } RETURN_TYPES = ("MODEL", "CLIP") OUTPUT_TOOLTIPS = ("The modified diffusion model.", "The modified CLIP model.") FUNCTION = "load_lora" CATEGORY = "loaders" DESCRIPTION = "LoRAs are used to modify diffusion and CLIP models, altering the way in which latents are denoised such as applying styles. Multiple LoRA nodes can be linked together." SHORT_DESCRIPTION = "Modifies diffusion and CLIP models using LoRA adjustments." SEARCH_ALIASES = ["lora", "load lora", "apply lora", "lora loader", "lora model"] def load_lora(self, model, clip, lora_name, strength_model, strength_clip): if strength_model == 0 and strength_clip == 0: return (model, clip) lora_path = folder_paths.get_full_path_or_raise("loras", lora_name) lora = None if self.loaded_lora is not None: if self.loaded_lora[0] == lora_path: lora = self.loaded_lora[1] else: self.loaded_lora = None if lora is None: lora = comfy.utils.load_torch_file(lora_path, safe_load=True) self.loaded_lora = (lora_path, lora) model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip) return (model_lora, clip_lora) class LoraLoaderModelOnly(LoraLoader): @classmethod def INPUT_TYPES(s): return {"required": { "model": ("MODEL",), "lora_name": (folder_paths.get_filename_list("loras"), ), "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), }} RETURN_TYPES = ("MODEL",) FUNCTION = "load_lora_model_only" DESCRIPTION = "Loads a LoRA and applies it to the diffusion model only, without modifying the CLIP model." SHORT_DESCRIPTION = "Applies a LoRA to the diffusion model only." def load_lora_model_only(self, model, lora_name, strength_model): return (self.load_lora(model, None, lora_name, strength_model, 0)[0],) class VAELoader: video_taes = ["taehv", "lighttaew2_2", "lighttaew2_1", "lighttaehy1_5", "taeltx_2"] image_taes = ["taesd", "taesdxl", "taesd3", "taef1"] @staticmethod def vae_list(s): vaes = folder_paths.get_filename_list("vae") approx_vaes = folder_paths.get_filename_list("vae_approx") sdxl_taesd_enc = False sdxl_taesd_dec = False sd1_taesd_enc = False sd1_taesd_dec = False sd3_taesd_enc = False sd3_taesd_dec = False f1_taesd_enc = False f1_taesd_dec = False for v in approx_vaes: if v.startswith("taesd_decoder."): sd1_taesd_dec = True elif v.startswith("taesd_encoder."): sd1_taesd_enc = True elif v.startswith("taesdxl_decoder."): sdxl_taesd_dec = True elif v.startswith("taesdxl_encoder."): sdxl_taesd_enc = True elif v.startswith("taesd3_decoder."): sd3_taesd_dec = True elif v.startswith("taesd3_encoder."): sd3_taesd_enc = True elif v.startswith("taef1_encoder."): f1_taesd_dec = True elif v.startswith("taef1_decoder."): f1_taesd_enc = True else: for tae in s.video_taes: if v.startswith(tae): vaes.append(v) if sd1_taesd_dec and sd1_taesd_enc: vaes.append("taesd") if sdxl_taesd_dec and sdxl_taesd_enc: vaes.append("taesdxl") if sd3_taesd_dec and sd3_taesd_enc: vaes.append("taesd3") if f1_taesd_dec and f1_taesd_enc: vaes.append("taef1") vaes.append("pixel_space") return vaes @staticmethod def load_taesd(name): sd = {} approx_vaes = folder_paths.get_filename_list("vae_approx") encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes)) decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes)) enc = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", encoder)) for k in enc: sd["taesd_encoder.{}".format(k)] = enc[k] dec = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", decoder)) for k in dec: sd["taesd_decoder.{}".format(k)] = dec[k] if name == "taesd": sd["vae_scale"] = torch.tensor(0.18215) sd["vae_shift"] = torch.tensor(0.0) elif name == "taesdxl": sd["vae_scale"] = torch.tensor(0.13025) sd["vae_shift"] = torch.tensor(0.0) elif name == "taesd3": sd["vae_scale"] = torch.tensor(1.5305) sd["vae_shift"] = torch.tensor(0.0609) elif name == "taef1": sd["vae_scale"] = torch.tensor(0.3611) sd["vae_shift"] = torch.tensor(0.1159) return sd @classmethod def INPUT_TYPES(s): return {"required": { "vae_name": (s.vae_list(s), )}} RETURN_TYPES = ("VAE",) FUNCTION = "load_vae" CATEGORY = "loaders" DESCRIPTION = "Loads a VAE model for encoding and decoding images to and from latent space, supporting full VAEs, tiny autoencoders (TAESD), and video VAEs." SHORT_DESCRIPTION = "Loads a VAE model for image encoding and decoding." #TODO: scale factor? def load_vae(self, vae_name): metadata = None if vae_name == "pixel_space": sd = {} sd["pixel_space_vae"] = torch.tensor(1.0) elif vae_name in self.image_taes: sd = self.load_taesd(vae_name) else: if os.path.splitext(vae_name)[0] in self.video_taes: vae_path = folder_paths.get_full_path_or_raise("vae_approx", vae_name) else: vae_path = folder_paths.get_full_path_or_raise("vae", vae_name) sd, metadata = comfy.utils.load_torch_file(vae_path, return_metadata=True) vae = comfy.sd.VAE(sd=sd, metadata=metadata) vae.throw_exception_if_invalid() return (vae,) class ControlNetLoader: @classmethod def INPUT_TYPES(s): return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} RETURN_TYPES = ("CONTROL_NET",) FUNCTION = "load_controlnet" CATEGORY = "loaders" DESCRIPTION = "Loads a ControlNet model from file for guiding image generation with structural conditioning." SHORT_DESCRIPTION = "Loads a ControlNet model from file." SEARCH_ALIASES = ["controlnet", "control net", "cn", "load controlnet", "controlnet loader"] def load_controlnet(self, control_net_name): controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name) controlnet = comfy.controlnet.load_controlnet(controlnet_path) if controlnet is None: raise RuntimeError("ERROR: controlnet file is invalid and does not contain a valid controlnet model.") return (controlnet,) class DiffControlNetLoader: @classmethod def INPUT_TYPES(s): return {"required": { "model": ("MODEL",), "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} RETURN_TYPES = ("CONTROL_NET",) FUNCTION = "load_controlnet" CATEGORY = "loaders" DESCRIPTION = "Loads a differential ControlNet model that requires a base diffusion model for initialization." SHORT_DESCRIPTION = "Loads a differential ControlNet with a base model." def load_controlnet(self, model, control_net_name): controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name) controlnet = comfy.controlnet.load_controlnet(controlnet_path, model) return (controlnet,) class ControlNetApply: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "control_net": ("CONTROL_NET", ), "image": ("IMAGE", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}) }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "apply_controlnet" DEPRECATED = True CATEGORY = "conditioning/controlnet" DESCRIPTION = "Applies a ControlNet to conditioning with an image hint and strength. Deprecated in favor of ControlNetApplyAdvanced." SHORT_DESCRIPTION = "Applies ControlNet to conditioning (deprecated)." def apply_controlnet(self, conditioning, control_net, image, strength): if strength == 0: return (conditioning, ) c = [] control_hint = image.movedim(-1,1) for t in conditioning: n = [t[0], t[1].copy()] c_net = control_net.copy().set_cond_hint(control_hint, strength) if 'control' in t[1]: c_net.set_previous_controlnet(t[1]['control']) n[1]['control'] = c_net n[1]['control_apply_to_uncond'] = True c.append(n) return (c, ) class ControlNetApplyAdvanced: @classmethod def INPUT_TYPES(s): return {"required": {"positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "control_net": ("CONTROL_NET", ), "image": ("IMAGE", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), "start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), "end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) }, "optional": {"vae": ("VAE", ), } } RETURN_TYPES = ("CONDITIONING","CONDITIONING") RETURN_NAMES = ("positive", "negative") FUNCTION = "apply_controlnet" CATEGORY = "conditioning/controlnet" DESCRIPTION = "Applies a ControlNet to both positive and negative conditioning with an image hint, adjustable strength, and start/end percentage controls for scheduling." SHORT_DESCRIPTION = "Applies ControlNet with strength and timestep scheduling." SEARCH_ALIASES = ["controlnet", "apply controlnet", "use controlnet", "control net"] def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent, vae=None, extra_concat=[]): if strength == 0: return (positive, negative) control_hint = image.movedim(-1,1) cnets = {} out = [] for conditioning in [positive, negative]: c = [] for t in conditioning: d = t[1].copy() prev_cnet = d.get('control', None) if prev_cnet in cnets: c_net = cnets[prev_cnet] else: c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent), vae=vae, extra_concat=extra_concat) c_net.set_previous_controlnet(prev_cnet) cnets[prev_cnet] = c_net d['control'] = c_net d['control_apply_to_uncond'] = False n = [t[0], d] c.append(n) out.append(c) return (out[0], out[1]) class UNETLoader: @classmethod def INPUT_TYPES(s): return {"required": { "unet_name": (folder_paths.get_filename_list("diffusion_models"), ), "weight_dtype": (["default", "fp8_e4m3fn", "fp8_e4m3fn_fast", "fp8_e5m2"],) }} RETURN_TYPES = ("MODEL",) FUNCTION = "load_unet" CATEGORY = "advanced/loaders" DESCRIPTION = "Loads a standalone diffusion model (UNET/DiT) with optional weight dtype selection including fp8 precision modes for lower memory usage." SHORT_DESCRIPTION = "Loads a diffusion model with optional fp8 precision." def load_unet(self, unet_name, weight_dtype): model_options = {} if weight_dtype == "fp8_e4m3fn": model_options["dtype"] = torch.float8_e4m3fn elif weight_dtype == "fp8_e4m3fn_fast": model_options["dtype"] = torch.float8_e4m3fn model_options["fp8_optimizations"] = True elif weight_dtype == "fp8_e5m2": model_options["dtype"] = torch.float8_e5m2 unet_path = folder_paths.get_full_path_or_raise("diffusion_models", unet_name) model = comfy.sd.load_diffusion_model(unet_path, model_options=model_options) return (model,) class CLIPLoader: @classmethod def INPUT_TYPES(s): return {"required": { "clip_name": (folder_paths.get_filename_list("text_encoders"), ), "type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio", "mochi", "ltxv", "pixart", "cosmos", "lumina2", "wan", "hidream", "chroma", "ace", "omnigen2", "qwen_image", "hunyuan_image", "flux2", "ovis"], ), }, "optional": { "device": (["default", "cpu"], {"advanced": True}), }} RETURN_TYPES = ("CLIP",) FUNCTION = "load_clip" CATEGORY = "advanced/loaders" DESCRIPTION = "[Recipes]\n\nstable_diffusion: clip-l\nstable_cascade: clip-g\nsd3: t5 xxl/ clip-g / clip-l\nstable_audio: t5 base\nmochi: t5 xxl\ncosmos: old t5 xxl\nlumina2: gemma 2 2B\nwan: umt5 xxl\n hidream: llama-3.1 (Recommend) or t5\nomnigen2: qwen vl 2.5 3B" SHORT_DESCRIPTION = "Loads a single CLIP model with architecture-specific recipes." def load_clip(self, clip_name, type="stable_diffusion", device="default"): clip_type = getattr(comfy.sd.CLIPType, type.upper(), comfy.sd.CLIPType.STABLE_DIFFUSION) model_options = {} if device == "cpu": model_options["load_device"] = model_options["offload_device"] = torch.device("cpu") clip_path = folder_paths.get_full_path_or_raise("text_encoders", clip_name) clip = comfy.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options) return (clip,) class DualCLIPLoader: @classmethod def INPUT_TYPES(s): return {"required": { "clip_name1": (folder_paths.get_filename_list("text_encoders"), ), "clip_name2": (folder_paths.get_filename_list("text_encoders"), ), "type": (["sdxl", "sd3", "flux", "hunyuan_video", "hidream", "hunyuan_image", "hunyuan_video_15", "kandinsky5", "kandinsky5_image", "ltxv", "newbie", "ace"], ), }, "optional": { "device": (["default", "cpu"], {"advanced": True}), }} RETURN_TYPES = ("CLIP",) FUNCTION = "load_clip" CATEGORY = "advanced/loaders" DESCRIPTION = "[Recipes]\n\nsdxl: clip-l, clip-g\nsd3: clip-l, clip-g / clip-l, t5 / clip-g, t5\nflux: clip-l, t5\nhidream: at least one of t5 or llama, recommended t5 and llama\nhunyuan_image: qwen2.5vl 7b and byt5 small\nnewbie: gemma-3-4b-it, jina clip v2" SHORT_DESCRIPTION = "Loads two CLIP models simultaneously with architecture recipes." def load_clip(self, clip_name1, clip_name2, type, device="default"): clip_type = getattr(comfy.sd.CLIPType, type.upper(), comfy.sd.CLIPType.STABLE_DIFFUSION) clip_path1 = folder_paths.get_full_path_or_raise("text_encoders", clip_name1) clip_path2 = folder_paths.get_full_path_or_raise("text_encoders", clip_name2) model_options = {} if device == "cpu": model_options["load_device"] = model_options["offload_device"] = torch.device("cpu") clip = comfy.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options) return (clip,) class CLIPVisionLoader: @classmethod def INPUT_TYPES(s): return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ), }} RETURN_TYPES = ("CLIP_VISION",) FUNCTION = "load_clip" CATEGORY = "loaders" DESCRIPTION = "Loads a CLIP Vision model for encoding images into CLIP vision embeddings." SHORT_DESCRIPTION = "Loads a CLIP Vision model for image encoding." def load_clip(self, clip_name): clip_path = folder_paths.get_full_path_or_raise("clip_vision", clip_name) clip_vision = comfy.clip_vision.load(clip_path) if clip_vision is None: raise RuntimeError("ERROR: clip vision file is invalid and does not contain a valid vision model.") return (clip_vision,) class CLIPVisionEncode: @classmethod def INPUT_TYPES(s): return {"required": { "clip_vision": ("CLIP_VISION",), "image": ("IMAGE",), "crop": (["center", "none"],) }} RETURN_TYPES = ("CLIP_VISION_OUTPUT",) FUNCTION = "encode" CATEGORY = "conditioning" DESCRIPTION = "Encodes an image using a CLIP Vision model to produce a vision embedding, with optional center cropping." SHORT_DESCRIPTION = "Encodes images into CLIP Vision embeddings." def encode(self, clip_vision, image, crop): crop_image = True if crop != "center": crop_image = False output = clip_vision.encode_image(image, crop=crop_image) return (output,) class StyleModelLoader: @classmethod def INPUT_TYPES(s): return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}} RETURN_TYPES = ("STYLE_MODEL",) FUNCTION = "load_style_model" CATEGORY = "loaders" DESCRIPTION = "Loads a style model from file for applying visual styles to conditioning." SHORT_DESCRIPTION = "Loads a style model from file." def load_style_model(self, style_model_name): style_model_path = folder_paths.get_full_path_or_raise("style_models", style_model_name) style_model = comfy.sd.load_style_model(style_model_path) return (style_model,) class StyleModelApply: SEARCH_ALIASES = ["style transfer"] @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "style_model": ("STYLE_MODEL", ), "clip_vision_output": ("CLIP_VISION_OUTPUT", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.001}), "strength_type": (["multiply", "attn_bias"], ), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "apply_stylemodel" CATEGORY = "conditioning/style_model" DESCRIPTION = "Applies a style model to conditioning using a CLIP Vision output, with adjustable strength via multiply or attention bias modes." SHORT_DESCRIPTION = "Applies a style model to conditioning with strength control." def apply_stylemodel(self, conditioning, style_model, clip_vision_output, strength, strength_type): cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0) if strength_type == "multiply": cond *= strength n = cond.shape[1] c_out = [] for t in conditioning: (txt, keys) = t keys = keys.copy() # even if the strength is 1.0 (i.e, no change), if there's already a mask, we have to add to it if "attention_mask" in keys or (strength_type == "attn_bias" and strength != 1.0): # math.log raises an error if the argument is zero # torch.log returns -inf, which is what we want attn_bias = torch.log(torch.Tensor([strength if strength_type == "attn_bias" else 1.0])) # get the size of the mask image mask_ref_size = keys.get("attention_mask_img_shape", (1, 1)) n_ref = mask_ref_size[0] * mask_ref_size[1] n_txt = txt.shape[1] # grab the existing mask mask = keys.get("attention_mask", None) # create a default mask if it doesn't exist if mask is None: mask = torch.zeros((txt.shape[0], n_txt + n_ref, n_txt + n_ref), dtype=torch.float16) # convert the mask dtype, because it might be boolean # we want it to be interpreted as a bias if mask.dtype == torch.bool: # log(True) = log(1) = 0 # log(False) = log(0) = -inf mask = torch.log(mask.to(dtype=torch.float16)) # now we make the mask bigger to add space for our new tokens new_mask = torch.zeros((txt.shape[0], n_txt + n + n_ref, n_txt + n + n_ref), dtype=torch.float16) # copy over the old mask, in quandrants new_mask[:, :n_txt, :n_txt] = mask[:, :n_txt, :n_txt] new_mask[:, :n_txt, n_txt+n:] = mask[:, :n_txt, n_txt:] new_mask[:, n_txt+n:, :n_txt] = mask[:, n_txt:, :n_txt] new_mask[:, n_txt+n:, n_txt+n:] = mask[:, n_txt:, n_txt:] # now fill in the attention bias to our redux tokens new_mask[:, :n_txt, n_txt:n_txt+n] = attn_bias new_mask[:, n_txt+n:, n_txt:n_txt+n] = attn_bias keys["attention_mask"] = new_mask.to(txt.device) keys["attention_mask_img_shape"] = mask_ref_size c_out.append([torch.cat((txt, cond), dim=1), keys]) return (c_out,) class unCLIPConditioning: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "clip_vision_output": ("CLIP_VISION_OUTPUT", ), "strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}), "noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "apply_adm" CATEGORY = "conditioning" DESCRIPTION = "Applies unCLIP image conditioning by adding CLIP vision embeddings to conditioning with adjustable strength and noise augmentation." SHORT_DESCRIPTION = "Applies unCLIP image conditioning with vision embeddings." def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation): if strength == 0: return (conditioning, ) c = node_helpers.conditioning_set_values(conditioning, {"unclip_conditioning": [{"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation}]}, append=True) return (c, ) class GLIGENLoader: @classmethod def INPUT_TYPES(s): return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}} RETURN_TYPES = ("GLIGEN",) FUNCTION = "load_gligen" CATEGORY = "loaders" DESCRIPTION = "Loads a GLIGEN model for spatially-grounded text-to-image generation." def load_gligen(self, gligen_name): gligen_path = folder_paths.get_full_path_or_raise("gligen", gligen_name) gligen = comfy.sd.load_gligen(gligen_path) return (gligen,) class GLIGENTextBoxApply: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning_to": ("CONDITIONING", ), "clip": ("CLIP", ), "gligen_textbox_model": ("GLIGEN", ), "text": ("STRING", {"multiline": True, "dynamicPrompts": True}), "width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning/gligen" DESCRIPTION = "Applies GLIGEN text box conditioning to place a text-described object at a specific bounding box position in the generated image." SHORT_DESCRIPTION = "Places a text-described object at a bounding box position." def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y): c = [] cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected") for t in conditioning_to: n = [t[0], t[1].copy()] position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)] prev = [] if "gligen" in n[1]: prev = n[1]['gligen'][2] n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params) c.append(n) return (c, ) class EmptyLatentImage: def __init__(self): self.device = comfy.model_management.intermediate_device() @classmethod def INPUT_TYPES(s): return { "required": { "width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The width of the latent images in pixels."}), "height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The height of the latent images in pixels."}), "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."}) } } RETURN_TYPES = ("LATENT",) OUTPUT_TOOLTIPS = ("The empty latent image batch.",) FUNCTION = "generate" CATEGORY = "latent" DESCRIPTION = "Create a new batch of empty latent images to be denoised via sampling." SHORT_DESCRIPTION = "Creates empty latent images for denoising via sampling." SEARCH_ALIASES = ["empty", "empty latent", "new latent", "create latent", "blank latent", "blank"] def generate(self, width, height, batch_size=1): latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device) return ({"samples": latent, "downscale_ratio_spacial": 8}, ) class LatentFromBatch: SEARCH_ALIASES = ["select from batch", "pick latent", "batch subset"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "batch_index": ("INT", {"default": 0, "min": 0, "max": 63}), "length": ("INT", {"default": 1, "min": 1, "max": 64}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "frombatch" CATEGORY = "latent/batch" DESCRIPTION = "Extracts a contiguous range of samples from a latent batch by specifying a start index and length." SHORT_DESCRIPTION = "Extracts a range of samples from a latent batch." def frombatch(self, samples, batch_index, length): s = samples.copy() s_in = samples["samples"] batch_index = min(s_in.shape[0] - 1, batch_index) length = min(s_in.shape[0] - batch_index, length) s["samples"] = s_in[batch_index:batch_index + length].clone() if "noise_mask" in samples: masks = samples["noise_mask"] if masks.shape[0] == 1: s["noise_mask"] = masks.clone() else: if masks.shape[0] < s_in.shape[0]: masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] s["noise_mask"] = masks[batch_index:batch_index + length].clone() if "batch_index" not in s: s["batch_index"] = [x for x in range(batch_index, batch_index+length)] else: s["batch_index"] = samples["batch_index"][batch_index:batch_index + length] return (s,) class RepeatLatentBatch: SEARCH_ALIASES = ["duplicate latent", "clone latent"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "amount": ("INT", {"default": 1, "min": 1, "max": 64}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "repeat" CATEGORY = "latent/batch" DESCRIPTION = "Repeats a latent batch a specified number of times to create a larger batch." SHORT_DESCRIPTION = "Duplicates latent batch samples a specified number of times." def repeat(self, samples, amount): s = samples.copy() s_in = samples["samples"] s["samples"] = s_in.repeat((amount,) + ((1,) * (s_in.ndim - 1))) if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1: masks = samples["noise_mask"] if masks.shape[0] < s_in.shape[0]: masks = masks.repeat((math.ceil(s_in.shape[0] / masks.shape[0]),) + ((1,) * (masks.ndim - 1)))[:s_in.shape[0]] s["noise_mask"] = samples["noise_mask"].repeat((amount,) + ((1,) * (samples["noise_mask"].ndim - 1))) if "batch_index" in s: offset = max(s["batch_index"]) - min(s["batch_index"]) + 1 s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]] return (s,) class LatentUpscale: SEARCH_ALIASES = ["enlarge latent", "resize latent"] upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] crop_methods = ["disabled", "center"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "crop": (s.crop_methods,)}} RETURN_TYPES = ("LATENT",) FUNCTION = "upscale" CATEGORY = "latent" DESCRIPTION = "Upscales latent representations to a target width and height using various interpolation methods, with optional cropping." SHORT_DESCRIPTION = "Upscales latents to a target resolution." def upscale(self, samples, upscale_method, width, height, crop): if width == 0 and height == 0: s = samples else: s = samples.copy() if width == 0: height = max(64, height) width = max(64, round(samples["samples"].shape[-1] * height / samples["samples"].shape[-2])) elif height == 0: width = max(64, width) height = max(64, round(samples["samples"].shape[-2] * width / samples["samples"].shape[-1])) else: width = max(64, width) height = max(64, height) s["samples"] = comfy.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop) return (s,) class LatentUpscaleBy: SEARCH_ALIASES = ["enlarge latent", "resize latent", "scale latent"] upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), "scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}} RETURN_TYPES = ("LATENT",) FUNCTION = "upscale" CATEGORY = "latent" DESCRIPTION = "Upscales latent representations by a relative scale factor using various interpolation methods." def upscale(self, samples, upscale_method, scale_by): s = samples.copy() width = round(samples["samples"].shape[-1] * scale_by) height = round(samples["samples"].shape[-2] * scale_by) s["samples"] = comfy.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled") return (s,) class LatentRotate: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],), }} RETURN_TYPES = ("LATENT",) FUNCTION = "rotate" CATEGORY = "latent/transform" DESCRIPTION = "Rotates latent representations by 90, 180, or 270 degrees." SHORT_DESCRIPTION = None def rotate(self, samples, rotation): s = samples.copy() rotate_by = 0 if rotation.startswith("90"): rotate_by = 1 elif rotation.startswith("180"): rotate_by = 2 elif rotation.startswith("270"): rotate_by = 3 s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2]) return (s,) class LatentFlip: SEARCH_ALIASES = ["mirror latent"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "flip_method": (["x-axis: vertically", "y-axis: horizontally"],), }} RETURN_TYPES = ("LATENT",) FUNCTION = "flip" CATEGORY = "latent/transform" DESCRIPTION = "Flips latent representations vertically or horizontally." SHORT_DESCRIPTION = None def flip(self, samples, flip_method): s = samples.copy() if flip_method.startswith("x"): s["samples"] = torch.flip(samples["samples"], dims=[2]) elif flip_method.startswith("y"): s["samples"] = torch.flip(samples["samples"], dims=[3]) return (s,) class LatentComposite: SEARCH_ALIASES = ["overlay latent", "layer latent", "paste latent"] @classmethod def INPUT_TYPES(s): return {"required": { "samples_to": ("LATENT",), "samples_from": ("LATENT",), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "composite" CATEGORY = "latent" DESCRIPTION = "Composites one latent onto another at a specified position with optional feathered blending at the edges." SHORT_DESCRIPTION = "Composites one latent onto another with feathering." def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0): x = x // 8 y = y // 8 feather = feather // 8 samples_out = samples_to.copy() s = samples_to["samples"].clone() samples_to = samples_to["samples"] samples_from = samples_from["samples"] if feather == 0: s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] else: samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] mask = torch.ones_like(samples_from) for t in range(feather): if y != 0: mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1)) if y + samples_from.shape[2] < samples_to.shape[2]: mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1)) if x != 0: mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1)) if x + samples_from.shape[3] < samples_to.shape[3]: mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1)) rev_mask = torch.ones_like(mask) - mask s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask samples_out["samples"] = s return (samples_out,) class LatentBlend: SEARCH_ALIASES = ["mix latents", "interpolate latents"] @classmethod def INPUT_TYPES(s): return {"required": { "samples1": ("LATENT",), "samples2": ("LATENT",), "blend_factor": ("FLOAT", { "default": 0.5, "min": 0, "max": 1, "step": 0.01 }), }} RETURN_TYPES = ("LATENT",) FUNCTION = "blend" CATEGORY = "_for_testing" DESCRIPTION = "Blends two latent representations together using a blend factor, automatically resizing if dimensions differ." SHORT_DESCRIPTION = "Blends two latents together using a blend factor." def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"): samples_out = samples1.copy() samples1 = samples1["samples"] samples2 = samples2["samples"] if samples1.shape != samples2.shape: samples2.permute(0, 3, 1, 2) samples2 = comfy.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center') samples2.permute(0, 2, 3, 1) samples_blended = self.blend_mode(samples1, samples2, blend_mode) samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor) samples_out["samples"] = samples_blended return (samples_out,) def blend_mode(self, img1, img2, mode): if mode == "normal": return img2 else: raise ValueError(f"Unsupported blend mode: {mode}") class LatentCrop: SEARCH_ALIASES = ["trim latent", "cut latent"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "crop" CATEGORY = "latent/transform" DESCRIPTION = "Crops a latent representation to a specified width, height, and position." SHORT_DESCRIPTION = None def crop(self, samples, width, height, x, y): s = samples.copy() samples = samples['samples'] x = x // 8 y = y // 8 #enfonce minimum size of 64 if x > (samples.shape[3] - 8): x = samples.shape[3] - 8 if y > (samples.shape[2] - 8): y = samples.shape[2] - 8 new_height = height // 8 new_width = width // 8 to_x = new_width + x to_y = new_height + y s['samples'] = samples[:,:,y:to_y, x:to_x] return (s,) class SetLatentNoiseMask: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "mask": ("MASK",), }} RETURN_TYPES = ("LATENT",) FUNCTION = "set_mask" CATEGORY = "latent/inpaint" DESCRIPTION = "Sets a noise mask on a latent so that sampling only adds noise within the masked region, used for inpainting workflows." SHORT_DESCRIPTION = "Sets a noise mask on latent for inpainting." def set_mask(self, samples, mask): s = samples.copy() s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])) return (s,) def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False): latent_image = latent["samples"] latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image, latent.get("downscale_ratio_spacial", None)) if disable_noise: noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu") else: batch_inds = latent["batch_index"] if "batch_index" in latent else None noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds) noise_mask = None if "noise_mask" in latent: noise_mask = latent["noise_mask"] callback = latent_preview.prepare_callback(model, steps) disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step, force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed) out = latent.copy() out.pop("downscale_ratio_spacial", None) out["samples"] = samples return (out, ) class KSampler: @classmethod def INPUT_TYPES(s): return { "required": { "model": ("MODEL", {"tooltip": "The model used for denoising the input latent."}), "seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True, "tooltip": "The random seed used for creating the noise."}), "steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "The number of steps used in the denoising process."}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01, "tooltip": "The Classifier-Free Guidance scale balances creativity and adherence to the prompt. Higher values result in images more closely matching the prompt however too high values will negatively impact quality."}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"tooltip": "The algorithm used when sampling, this can affect the quality, speed, and style of the generated output."}), "scheduler": (comfy.samplers.KSampler.SCHEDULERS, {"tooltip": "The scheduler controls how noise is gradually removed to form the image."}), "positive": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to include in the image."}), "negative": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to exclude from the image."}), "latent_image": ("LATENT", {"tooltip": "The latent image to denoise."}), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "The amount of denoising applied, lower values will maintain the structure of the initial image allowing for image to image sampling."}), } } RETURN_TYPES = ("LATENT",) OUTPUT_TOOLTIPS = ("The denoised latent.",) FUNCTION = "sample" CATEGORY = "sampling" DESCRIPTION = "Uses the provided model, positive and negative conditioning to denoise the latent image." SHORT_DESCRIPTION = "Denoises latent images using model and conditioning inputs." SEARCH_ALIASES = ["sampler", "sample", "generate", "denoise", "diffuse", "txt2img", "img2img"] def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0): return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) class KSamplerAdvanced: @classmethod def INPUT_TYPES(s): return {"required": {"model": ("MODEL",), "add_noise": (["enable", "disable"], ), "noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True}), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), "scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), "positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "latent_image": ("LATENT", ), "start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), "end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), "return_with_leftover_noise": (["disable", "enable"], ), } } RETURN_TYPES = ("LATENT",) FUNCTION = "sample" CATEGORY = "sampling" DESCRIPTION = "Advanced sampler with fine-grained control over noise addition, start/end steps, and whether to return with leftover noise for multi-pass sampling." SHORT_DESCRIPTION = "Advanced sampler with step range and noise controls." def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0): force_full_denoise = True if return_with_leftover_noise == "enable": force_full_denoise = False disable_noise = False if add_noise == "disable": disable_noise = True return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise) class SaveImage: def __init__(self): self.output_dir = folder_paths.get_output_directory() self.type = "output" self.prefix_append = "" self.compress_level = 4 @classmethod def INPUT_TYPES(s): return { "required": { "images": ("IMAGE", {"tooltip": "The images to save."}), "filename_prefix": ("STRING", {"default": "ComfyUI", "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes."}) }, "hidden": { "prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO" }, } RETURN_TYPES = () FUNCTION = "save_images" OUTPUT_NODE = True CATEGORY = "image" DESCRIPTION = "Saves the input images to your ComfyUI output directory." SHORT_DESCRIPTION = None SEARCH_ALIASES = ["save", "save image", "export image", "output image", "write image", "download"] def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): filename_prefix += self.prefix_append full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0]) results = list() for (batch_number, image) in enumerate(images): i = 255. * image.cpu().numpy() img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) metadata = None if not args.disable_metadata: metadata = PngInfo() if prompt is not None: metadata.add_text("prompt", json.dumps(prompt)) if extra_pnginfo is not None: for x in extra_pnginfo: metadata.add_text(x, json.dumps(extra_pnginfo[x])) filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) file = f"{filename_with_batch_num}_{counter:05}_.png" img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level) results.append({ "filename": file, "subfolder": subfolder, "type": self.type }) counter += 1 return { "ui": { "images": results } } class PreviewImage(SaveImage): def __init__(self): self.output_dir = folder_paths.get_temp_directory() self.type = "temp" self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) self.compress_level = 1 SEARCH_ALIASES = ["preview", "preview image", "show image", "view image", "display image", "image viewer"] DESCRIPTION = "Previews images in the UI by saving them as temporary files that are not permanently stored." SHORT_DESCRIPTION = "Previews images in the UI as temporary files." @classmethod def INPUT_TYPES(s): return {"required": {"images": ("IMAGE", ), }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, } class LoadImage: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] files = folder_paths.filter_files_content_types(files, ["image"]) return {"required": {"image": (sorted(files), {"image_upload": True})}, } CATEGORY = "image" DESCRIPTION = "Loads an image from the input directory, supporting animated formats and alpha channel extraction as a mask output." SHORT_DESCRIPTION = "Loads an image file with optional alpha mask output." SEARCH_ALIASES = ["load image", "open image", "import image", "image input", "upload image", "read image", "image loader"] RETURN_TYPES = ("IMAGE", "MASK") FUNCTION = "load_image" def load_image(self, image): image_path = folder_paths.get_annotated_filepath(image) img = node_helpers.pillow(Image.open, image_path) output_images = [] output_masks = [] w, h = None, None for i in ImageSequence.Iterator(img): i = node_helpers.pillow(ImageOps.exif_transpose, i) if i.mode == 'I': i = i.point(lambda i: i * (1 / 255)) image = i.convert("RGB") if len(output_images) == 0: w = image.size[0] h = image.size[1] if image.size[0] != w or image.size[1] != h: continue image = np.array(image).astype(np.float32) / 255.0 image = torch.from_numpy(image)[None,] if 'A' in i.getbands(): mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 mask = 1. - torch.from_numpy(mask) elif i.mode == 'P' and 'transparency' in i.info: mask = np.array(i.convert('RGBA').getchannel('A')).astype(np.float32) / 255.0 mask = 1. - torch.from_numpy(mask) else: mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") output_images.append(image) output_masks.append(mask.unsqueeze(0)) if img.format == "MPO": break # ignore all frames except the first one for MPO format if len(output_images) > 1: output_image = torch.cat(output_images, dim=0) output_mask = torch.cat(output_masks, dim=0) else: output_image = output_images[0] output_mask = output_masks[0] return (output_image, output_mask) @classmethod def IS_CHANGED(s, image): image_path = folder_paths.get_annotated_filepath(image) m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) return m.digest().hex() @classmethod def VALIDATE_INPUTS(s, image): if not folder_paths.exists_annotated_filepath(image): return "Invalid image file: {}".format(image) return True class LoadImageMask: SEARCH_ALIASES = ["import mask", "alpha mask", "channel mask"] _color_channels = ["alpha", "red", "green", "blue"] @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] return {"required": {"image": (sorted(files), {"image_upload": True}), "channel": (s._color_channels, ), } } CATEGORY = "mask" DESCRIPTION = "Loads an image and extracts a specific color channel as a mask." SHORT_DESCRIPTION = "Loads an image channel as a mask." RETURN_TYPES = ("MASK",) FUNCTION = "load_image" def load_image(self, image, channel): image_path = folder_paths.get_annotated_filepath(image) i = node_helpers.pillow(Image.open, image_path) i = node_helpers.pillow(ImageOps.exif_transpose, i) if i.getbands() != ("R", "G", "B", "A"): if i.mode == 'I': i = i.point(lambda i: i * (1 / 255)) i = i.convert("RGBA") mask = None c = channel[0].upper() if c in i.getbands(): mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0 mask = torch.from_numpy(mask) if c == 'A': mask = 1. - mask else: mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") return (mask.unsqueeze(0),) @classmethod def IS_CHANGED(s, image, channel): image_path = folder_paths.get_annotated_filepath(image) m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) return m.digest().hex() @classmethod def VALIDATE_INPUTS(s, image): if not folder_paths.exists_annotated_filepath(image): return "Invalid image file: {}".format(image) return True class LoadImageOutput(LoadImage): SEARCH_ALIASES = ["output image", "previous generation"] @classmethod def INPUT_TYPES(s): return { "required": { "image": ("COMBO", { "image_upload": True, "image_folder": "output", "remote": { "route": "/internal/files/output", "refresh_button": True, "control_after_refresh": "first", }, }), } } DESCRIPTION = "Load an image from the output folder. When the refresh button is clicked, the node will update the image list and automatically select the first image, allowing for easy iteration." SHORT_DESCRIPTION = "Loads images from the output folder with auto-refresh." EXPERIMENTAL = True FUNCTION = "load_image" class ImageScale: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] crop_methods = ["disabled", "center"] @classmethod def INPUT_TYPES(s): return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), "crop": (s.crop_methods,)}} RETURN_TYPES = ("IMAGE",) FUNCTION = "upscale" CATEGORY = "image/upscaling" DESCRIPTION = "Scales an image to a target width and height using various interpolation methods, with optional center cropping." SHORT_DESCRIPTION = "Scales images to a target width and height." SEARCH_ALIASES = ["resize", "resize image", "scale image", "image resize", "zoom", "zoom in", "change size"] def upscale(self, image, upscale_method, width, height, crop): if width == 0 and height == 0: s = image else: samples = image.movedim(-1,1) if width == 0: width = max(1, round(samples.shape[3] * height / samples.shape[2])) elif height == 0: height = max(1, round(samples.shape[2] * width / samples.shape[3])) s = comfy.utils.common_upscale(samples, width, height, upscale_method, crop) s = s.movedim(1,-1) return (s,) class ImageScaleBy: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] @classmethod def INPUT_TYPES(s): return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), "scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}} RETURN_TYPES = ("IMAGE",) FUNCTION = "upscale" CATEGORY = "image/upscaling" DESCRIPTION = "Scales an image by a relative factor using various interpolation methods." SHORT_DESCRIPTION = None def upscale(self, image, upscale_method, scale_by): samples = image.movedim(-1,1) width = round(samples.shape[3] * scale_by) height = round(samples.shape[2] * scale_by) s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled") s = s.movedim(1,-1) return (s,) class ImageInvert: SEARCH_ALIASES = ["reverse colors"] @classmethod def INPUT_TYPES(s): return {"required": { "image": ("IMAGE",)}} RETURN_TYPES = ("IMAGE",) FUNCTION = "invert" CATEGORY = "image" DESCRIPTION = "Inverts image colors by subtracting each pixel value from 1.0." SHORT_DESCRIPTION = None def invert(self, image): s = 1.0 - image return (s,) class ImageBatch: SEARCH_ALIASES = ["combine images", "merge images", "stack images"] @classmethod def INPUT_TYPES(s): return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}} RETURN_TYPES = ("IMAGE",) FUNCTION = "batch" CATEGORY = "image" DESCRIPTION = "Batches two images together into a single batch, automatically resizing if dimensions differ. Deprecated in favor of the general-purpose Batch node." SHORT_DESCRIPTION = "Batches two images together (deprecated)." DEPRECATED = True def batch(self, image1, image2): if image1.shape[-1] != image2.shape[-1]: if image1.shape[-1] > image2.shape[-1]: image2 = torch.nn.functional.pad(image2, (0,1), mode='constant', value=1.0) else: image1 = torch.nn.functional.pad(image1, (0,1), mode='constant', value=1.0) if image1.shape[1:] != image2.shape[1:]: image2 = comfy.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1) s = torch.cat((image1, image2), dim=0) return (s,) class EmptyImage: def __init__(self, device="cpu"): self.device = device @classmethod def INPUT_TYPES(s): return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), "height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}), "color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}), }} RETURN_TYPES = ("IMAGE",) FUNCTION = "generate" CATEGORY = "image" DESCRIPTION = "Creates a batch of blank images filled with a specified color at the given dimensions." SHORT_DESCRIPTION = "Creates blank images filled with a solid color." def generate(self, width, height, batch_size=1, color=0): r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF) g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF) b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF) return (torch.cat((r, g, b), dim=-1), ) class ImagePadForOutpaint: SEARCH_ALIASES = ["extend canvas", "expand image"] @classmethod def INPUT_TYPES(s): return { "required": { "image": ("IMAGE",), "left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}), } } RETURN_TYPES = ("IMAGE", "MASK") FUNCTION = "expand_image" CATEGORY = "image" DESCRIPTION = "Pads an image on all sides for outpainting, generating a feathered mask that blends the original image edges into the new padded area." SHORT_DESCRIPTION = "Pads images with feathered mask for outpainting." def expand_image(self, image, left, top, right, bottom, feathering): d1, d2, d3, d4 = image.size() new_image = torch.ones( (d1, d2 + top + bottom, d3 + left + right, d4), dtype=torch.float32, ) * 0.5 new_image[:, top:top + d2, left:left + d3, :] = image mask = torch.ones( (d2 + top + bottom, d3 + left + right), dtype=torch.float32, ) t = torch.zeros( (d2, d3), dtype=torch.float32 ) if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3: for i in range(d2): for j in range(d3): dt = i if top != 0 else d2 db = d2 - i if bottom != 0 else d2 dl = j if left != 0 else d3 dr = d3 - j if right != 0 else d3 d = min(dt, db, dl, dr) if d >= feathering: continue v = (feathering - d) / feathering t[i, j] = v * v mask[top:top + d2, left:left + d3] = t return (new_image, mask.unsqueeze(0)) NODE_CLASS_MAPPINGS = { "KSampler": KSampler, "CheckpointLoaderSimple": CheckpointLoaderSimple, "CLIPTextEncode": CLIPTextEncode, "CLIPSetLastLayer": CLIPSetLastLayer, "VAEDecode": VAEDecode, "VAEEncode": VAEEncode, "VAEEncodeForInpaint": VAEEncodeForInpaint, "VAELoader": VAELoader, "EmptyLatentImage": EmptyLatentImage, "LatentUpscale": LatentUpscale, "LatentUpscaleBy": LatentUpscaleBy, "LatentFromBatch": LatentFromBatch, "RepeatLatentBatch": RepeatLatentBatch, "SaveImage": SaveImage, "PreviewImage": PreviewImage, "LoadImage": LoadImage, "LoadImageMask": LoadImageMask, "LoadImageOutput": LoadImageOutput, "ImageScale": ImageScale, "ImageScaleBy": ImageScaleBy, "ImageInvert": ImageInvert, "ImageBatch": ImageBatch, "ImagePadForOutpaint": ImagePadForOutpaint, "EmptyImage": EmptyImage, "ConditioningAverage": ConditioningAverage , "ConditioningCombine": ConditioningCombine, "ConditioningConcat": ConditioningConcat, "ConditioningSetArea": ConditioningSetArea, "ConditioningSetAreaPercentage": ConditioningSetAreaPercentage, "ConditioningSetAreaStrength": ConditioningSetAreaStrength, "ConditioningSetMask": ConditioningSetMask, "KSamplerAdvanced": KSamplerAdvanced, "SetLatentNoiseMask": SetLatentNoiseMask, "LatentComposite": LatentComposite, "LatentBlend": LatentBlend, "LatentRotate": LatentRotate, "LatentFlip": LatentFlip, "LatentCrop": LatentCrop, "LoraLoader": LoraLoader, "CLIPLoader": CLIPLoader, "UNETLoader": UNETLoader, "DualCLIPLoader": DualCLIPLoader, "CLIPVisionEncode": CLIPVisionEncode, "StyleModelApply": StyleModelApply, "unCLIPConditioning": unCLIPConditioning, "ControlNetApply": ControlNetApply, "ControlNetApplyAdvanced": ControlNetApplyAdvanced, "ControlNetLoader": ControlNetLoader, "DiffControlNetLoader": DiffControlNetLoader, "StyleModelLoader": StyleModelLoader, "CLIPVisionLoader": CLIPVisionLoader, "VAEDecodeTiled": VAEDecodeTiled, "VAEEncodeTiled": VAEEncodeTiled, "unCLIPCheckpointLoader": unCLIPCheckpointLoader, "GLIGENLoader": GLIGENLoader, "GLIGENTextBoxApply": GLIGENTextBoxApply, "InpaintModelConditioning": InpaintModelConditioning, "CheckpointLoader": CheckpointLoader, "DiffusersLoader": DiffusersLoader, "LoadLatent": LoadLatent, "SaveLatent": SaveLatent, "ConditioningZeroOut": ConditioningZeroOut, "ConditioningSetTimestepRange": ConditioningSetTimestepRange, "LoraLoaderModelOnly": LoraLoaderModelOnly, } NODE_DISPLAY_NAME_MAPPINGS = { # Sampling "KSampler": "KSampler", "KSamplerAdvanced": "KSampler (Advanced)", # Loaders "CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)", "CheckpointLoaderSimple": "Load Checkpoint", "VAELoader": "Load VAE", "LoraLoader": "Load LoRA (Model and CLIP)", "LoraLoaderModelOnly": "Load LoRA", "CLIPLoader": "Load CLIP", "ControlNetLoader": "Load ControlNet Model", "DiffControlNetLoader": "Load ControlNet Model (diff)", "StyleModelLoader": "Load Style Model", "CLIPVisionLoader": "Load CLIP Vision", "UNETLoader": "Load Diffusion Model", # Conditioning "CLIPVisionEncode": "CLIP Vision Encode", "StyleModelApply": "Apply Style Model", "CLIPTextEncode": "CLIP Text Encode (Prompt)", "CLIPSetLastLayer": "CLIP Set Last Layer", "ConditioningCombine": "Conditioning (Combine)", "ConditioningAverage ": "Conditioning (Average)", "ConditioningConcat": "Conditioning (Concat)", "ConditioningSetArea": "Conditioning (Set Area)", "ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)", "ConditioningSetMask": "Conditioning (Set Mask)", "ControlNetApply": "Apply ControlNet (OLD)", "ControlNetApplyAdvanced": "Apply ControlNet", # Latent "VAEEncodeForInpaint": "VAE Encode (for Inpainting)", "SetLatentNoiseMask": "Set Latent Noise Mask", "VAEDecode": "VAE Decode", "VAEEncode": "VAE Encode", "LatentRotate": "Rotate Latent", "LatentFlip": "Flip Latent", "LatentCrop": "Crop Latent", "EmptyLatentImage": "Empty Latent Image", "LatentUpscale": "Upscale Latent", "LatentUpscaleBy": "Upscale Latent By", "LatentComposite": "Latent Composite", "LatentBlend": "Latent Blend", "LatentFromBatch" : "Latent From Batch", "RepeatLatentBatch": "Repeat Latent Batch", # Image "SaveImage": "Save Image", "PreviewImage": "Preview Image", "LoadImage": "Load Image", "LoadImageMask": "Load Image (as Mask)", "LoadImageOutput": "Load Image (from Outputs)", "ImageScale": "Upscale Image", "ImageScaleBy": "Upscale Image By", "ImageInvert": "Invert Image", "ImagePadForOutpaint": "Pad Image for Outpainting", "ImageBatch": "Batch Images", "ImageCrop": "Image Crop", "ImageStitch": "Image Stitch", "ImageBlend": "Image Blend", "ImageBlur": "Image Blur", "ImageQuantize": "Image Quantize", "ImageSharpen": "Image Sharpen", "ImageScaleToTotalPixels": "Scale Image to Total Pixels", "GetImageSize": "Get Image Size", # _for_testing "VAEDecodeTiled": "VAE Decode (Tiled)", "VAEEncodeTiled": "VAE Encode (Tiled)", } EXTENSION_WEB_DIRS = {} # Dictionary of successfully loaded module names and associated directories. LOADED_MODULE_DIRS = {} def get_module_name(module_path: str) -> str: """ Returns the module name based on the given module path. Examples: get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.py") -> "my_custom_node" get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node") -> "my_custom_node" get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/") -> "my_custom_node" get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__.py") -> "my_custom_node" get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__") -> "my_custom_node" get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__/") -> "my_custom_node" get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.disabled") -> "custom_nodes Args: module_path (str): The path of the module. Returns: str: The module name. """ base_path = os.path.basename(module_path) if os.path.isfile(module_path): base_path = os.path.splitext(base_path)[0] return base_path async def load_custom_node(module_path: str, ignore=set(), module_parent="custom_nodes") -> bool: module_name = get_module_name(module_path) if os.path.isfile(module_path): sp = os.path.splitext(module_path) module_name = sp[0] sys_module_name = module_name elif os.path.isdir(module_path): sys_module_name = module_path.replace(".", "_x_") try: logging.debug("Trying to load custom node {}".format(module_path)) if os.path.isfile(module_path): module_spec = importlib.util.spec_from_file_location(sys_module_name, module_path) module_dir = os.path.split(module_path)[0] else: module_spec = importlib.util.spec_from_file_location(sys_module_name, os.path.join(module_path, "__init__.py")) module_dir = module_path module = importlib.util.module_from_spec(module_spec) sys.modules[sys_module_name] = module module_spec.loader.exec_module(module) LOADED_MODULE_DIRS[module_name] = os.path.abspath(module_dir) try: from comfy_config import config_parser project_config = config_parser.extract_node_configuration(module_path) web_dir_name = project_config.tool_comfy.web if web_dir_name: web_dir_path = os.path.join(module_path, web_dir_name) if os.path.isdir(web_dir_path): project_name = project_config.project.name EXTENSION_WEB_DIRS[project_name] = web_dir_path logging.info("Automatically register web folder {} for {}".format(web_dir_name, project_name)) except Exception as e: logging.warning(f"Unable to parse pyproject.toml due to lack dependency pydantic-settings, please run 'pip install -r requirements.txt': {e}") if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None: web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY"))) if os.path.isdir(web_dir): EXTENSION_WEB_DIRS[module_name] = web_dir # V1 node definition if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None: for name, node_cls in module.NODE_CLASS_MAPPINGS.items(): if name not in ignore: NODE_CLASS_MAPPINGS[name] = node_cls node_cls.RELATIVE_PYTHON_MODULE = "{}.{}".format(module_parent, get_module_name(module_path)) if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None: NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS) return True # V3 Extension Definition elif hasattr(module, "comfy_entrypoint"): entrypoint = getattr(module, "comfy_entrypoint") if not callable(entrypoint): logging.warning(f"comfy_entrypoint in {module_path} is not callable, skipping.") return False try: if inspect.iscoroutinefunction(entrypoint): extension = await entrypoint() else: extension = entrypoint() if not isinstance(extension, ComfyExtension): logging.warning(f"comfy_entrypoint in {module_path} did not return a ComfyExtension, skipping.") return False await extension.on_load() node_list = await extension.get_node_list() if not isinstance(node_list, list): logging.warning(f"comfy_entrypoint in {module_path} did not return a list of nodes, skipping.") return False for node_cls in node_list: node_cls: io.ComfyNode schema = node_cls.GET_SCHEMA() if schema.node_id not in ignore: NODE_CLASS_MAPPINGS[schema.node_id] = node_cls node_cls.RELATIVE_PYTHON_MODULE = "{}.{}".format(module_parent, get_module_name(module_path)) if schema.display_name is not None: NODE_DISPLAY_NAME_MAPPINGS[schema.node_id] = schema.display_name return True except Exception as e: logging.warning(f"Error while calling comfy_entrypoint in {module_path}: {e}") return False else: logging.warning(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS or NODES_LIST (need one).") return False except Exception as e: logging.warning(traceback.format_exc()) logging.warning(f"Cannot import {module_path} module for custom nodes: {e}") return False async def init_external_custom_nodes(): """ Initializes the external custom nodes. This function loads custom nodes from the specified folder paths and imports them into the application. It measures the import times for each custom node and logs the results. Returns: None """ base_node_names = set(NODE_CLASS_MAPPINGS.keys()) node_paths = folder_paths.get_folder_paths("custom_nodes") node_import_times = [] for custom_node_path in node_paths: possible_modules = os.listdir(os.path.realpath(custom_node_path)) if "__pycache__" in possible_modules: possible_modules.remove("__pycache__") for possible_module in possible_modules: module_path = os.path.join(custom_node_path, possible_module) if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue if module_path.endswith(".disabled"): continue if args.disable_all_custom_nodes and possible_module not in args.whitelist_custom_nodes: logging.info(f"Skipping {possible_module} due to disable_all_custom_nodes and whitelist_custom_nodes") continue if args.enable_manager: if comfyui_manager.should_be_disabled(module_path): logging.info(f"Blocked by policy: {module_path}") continue time_before = time.perf_counter() success = await load_custom_node(module_path, base_node_names, module_parent="custom_nodes") node_import_times.append((time.perf_counter() - time_before, module_path, success)) if len(node_import_times) > 0: logging.info("\nImport times for custom nodes:") for n in sorted(node_import_times): if n[2]: import_message = "" else: import_message = " (IMPORT FAILED)" logging.info("{:6.1f} seconds{}: {}".format(n[0], import_message, n[1])) logging.info("") async def init_builtin_extra_nodes(): """ Initializes the built-in extra nodes in ComfyUI. This function loads the extra node files located in the "comfy_extras" directory and imports them into ComfyUI. If any of the extra node files fail to import, a warning message is logged. Returns: None """ extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras") extras_files = [ "nodes_latent.py", "nodes_hypernetwork.py", "nodes_upscale_model.py", "nodes_post_processing.py", "nodes_mask.py", "nodes_compositing.py", "nodes_rebatch.py", "nodes_model_merging.py", "nodes_tomesd.py", "nodes_clip_sdxl.py", "nodes_canny.py", "nodes_freelunch.py", "nodes_custom_sampler.py", "nodes_hypertile.py", "nodes_model_advanced.py", "nodes_model_downscale.py", "nodes_images.py", "nodes_video_model.py", "nodes_train.py", "nodes_dataset.py", "nodes_sag.py", "nodes_perpneg.py", "nodes_stable3d.py", "nodes_sdupscale.py", "nodes_photomaker.py", "nodes_pixart.py", "nodes_cond.py", "nodes_morphology.py", "nodes_stable_cascade.py", "nodes_differential_diffusion.py", "nodes_ip2p.py", "nodes_model_merging_model_specific.py", "nodes_pag.py", "nodes_align_your_steps.py", "nodes_attention_multiply.py", "nodes_advanced_samplers.py", "nodes_webcam.py", "nodes_audio.py", "nodes_sd3.py", "nodes_gits.py", "nodes_controlnet.py", "nodes_hunyuan.py", "nodes_eps.py", "nodes_flux.py", "nodes_lora_extract.py", "nodes_torch_compile.py", "nodes_mochi.py", "nodes_slg.py", "nodes_mahiro.py", "nodes_lt_upsampler.py", "nodes_lt_audio.py", "nodes_lt.py", "nodes_hooks.py", "nodes_load_3d.py", "nodes_cosmos.py", "nodes_video.py", "nodes_lumina2.py", "nodes_wan.py", "nodes_lotus.py", "nodes_hunyuan3d.py", "nodes_primitive.py", "nodes_cfg.py", "nodes_optimalsteps.py", "nodes_hidream.py", "nodes_fresca.py", "nodes_apg.py", "nodes_preview_any.py", "nodes_ace.py", "nodes_string.py", "nodes_camera_trajectory.py", "nodes_edit_model.py", "nodes_tcfg.py", "nodes_context_windows.py", "nodes_qwen.py", "nodes_chroma_radiance.py", "nodes_model_patch.py", "nodes_easycache.py", "nodes_audio_encoder.py", "nodes_rope.py", "nodes_logic.py", "nodes_nop.py", "nodes_kandinsky5.py", "nodes_wanmove.py", "nodes_image_compare.py", "nodes_zimage.py", "nodes_lora_debug.py", "nodes_color.py", "nodes_toolkit.py", "nodes_replacements.py", ] import_failed = [] for node_file in extras_files: if not await load_custom_node(os.path.join(extras_dir, node_file), module_parent="comfy_extras"): import_failed.append(node_file) return import_failed async def init_builtin_api_nodes(): api_nodes_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_api_nodes") api_nodes_files = sorted(glob.glob(os.path.join(api_nodes_dir, "nodes_*.py"))) import_failed = [] for node_file in api_nodes_files: if not await load_custom_node(node_file, module_parent="comfy_api_nodes"): import_failed.append(os.path.basename(node_file)) return import_failed async def init_public_apis(): register_versions([ ComfyAPIWithVersion( version=getattr(v, "VERSION"), api_class=v ) for v in supported_versions ]) async def init_extra_nodes(init_custom_nodes=True, init_api_nodes=True): await init_public_apis() import_failed = await init_builtin_extra_nodes() import_failed_api = [] if init_api_nodes: import_failed_api = await init_builtin_api_nodes() if init_custom_nodes: await init_external_custom_nodes() else: logging.info("Skipping loading of custom nodes") if len(import_failed_api) > 0: logging.warning("WARNING: some comfy_api_nodes/ nodes did not import correctly. This may be because they are missing some dependencies.\n") for node in import_failed_api: logging.warning("IMPORT FAILED: {}".format(node)) logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.") if args.windows_standalone_build: logging.warning("Please run the update script: update/update_comfyui.bat") else: logging.warning("Please do a: pip install -r requirements.txt") logging.warning("") if len(import_failed) > 0: logging.warning("WARNING: some comfy_extras/ nodes did not import correctly. This may be because they are missing some dependencies.\n") for node in import_failed: logging.warning("IMPORT FAILED: {}".format(node)) logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.") if args.windows_standalone_build: logging.warning("Please run the update script: update/update_comfyui.bat") else: logging.warning("Please do a: pip install -r requirements.txt") logging.warning("") return import_failed