Add support for FLUX.2 klein base models

This commit is contained in:
Jaret Burkett
2026-01-17 17:46:25 -07:00
parent 0efed794b4
commit a6da9e37ac
8 changed files with 361 additions and 56 deletions

View File

@@ -17,6 +17,35 @@ class Flux2Params:
axes_dim: list[int] = field(default_factory=lambda: [32, 32, 32, 32])
theta: int = 2000
mlp_ratio: float = 3.0
use_guidance_embed: bool = True
@dataclass
class Klein9BParams:
in_channels: int = 128
context_in_dim: int = 12288
hidden_size: int = 4096
num_heads: int = 32
depth: int = 8
depth_single_blocks: int = 24
axes_dim: list[int] = field(default_factory=lambda: [32, 32, 32, 32])
theta: int = 2000
mlp_ratio: float = 3.0
use_guidance_embed: bool = False
@dataclass
class Klein4BParams:
in_channels: int = 128
context_in_dim: int = 7680
hidden_size: int = 3072
num_heads: int = 24
depth: int = 5
depth_single_blocks: int = 20
axes_dim: list[int] = field(default_factory=lambda: [32, 32, 32, 32])
theta: int = 2000
mlp_ratio: float = 3.0
use_guidance_embed: bool = False
class FakeConfig:
@@ -50,11 +79,14 @@ class Flux2(nn.Module):
self.time_in = MLPEmbedder(
in_dim=256, hidden_dim=self.hidden_size, disable_bias=True
)
self.guidance_in = MLPEmbedder(
in_dim=256, hidden_dim=self.hidden_size, disable_bias=True
)
self.txt_in = nn.Linear(params.context_in_dim, self.hidden_size, bias=False)
self.use_guidance_embed = params.use_guidance_embed
if self.use_guidance_embed:
self.guidance_in = MLPEmbedder(
in_dim=256, hidden_dim=self.hidden_size, disable_bias=True
)
self.double_blocks = nn.ModuleList(
[
DoubleStreamBlock(
@@ -116,14 +148,15 @@ class Flux2(nn.Module):
timesteps: Tensor,
ctx: Tensor,
ctx_ids: Tensor,
guidance: Tensor,
guidance: Tensor | None,
):
num_txt_tokens = ctx.shape[1]
timestep_emb = timestep_embedding(timesteps, 256)
vec = self.time_in(timestep_emb)
guidance_emb = timestep_embedding(guidance, 256)
vec = vec + self.guidance_in(guidance_emb)
if self.use_guidance_embed:
guidance_emb = timestep_embedding(guidance, 256)
vec = vec + self.guidance_in(guidance_emb)
double_block_mod_img = self.double_stream_modulation_img(vec)
double_block_mod_txt = self.double_stream_modulation_txt(vec)

View File

@@ -4,8 +4,6 @@ import numpy as np
import torch
import PIL.Image
from dataclasses import dataclass
from typing import List, Union
from diffusers.image_processor import VaeImageProcessor
from diffusers.schedulers import FlowMatchEulerDiscreteScheduler
from diffusers.utils import (
@@ -13,14 +11,10 @@ from diffusers.utils import (
)
from diffusers.utils.torch_utils import randn_tensor
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
from diffusers.utils import BaseOutput
from .autoencoder import AutoEncoder
from .model import Flux2
from einops import rearrange
from transformers import AutoProcessor, Mistral3ForConditionalGeneration
from .sampling import (
@@ -41,7 +35,8 @@ logger = logging.get_logger(__name__) # pylint: disable=invalid-name
SYSTEM_MESSAGE = """You are an AI that reasons about image descriptions. You give structured responses focusing on object relationships, object
attribution and actions without speculation."""
OUTPUT_LAYERS = [10, 20, 30]
OUTPUT_LAYERS_MISTRAL = [10, 20, 30]
OUTPUT_LAYERS_QWEN3 = [9, 18, 27]
MAX_LENGTH = 512
@@ -56,6 +51,8 @@ class Flux2Pipeline(DiffusionPipeline):
text_encoder: Mistral3ForConditionalGeneration,
tokenizer: AutoProcessor,
transformer: Flux2,
text_encoder_type: str = "mistral", # "mistral" or "qwen"
is_guidance_distilled: bool = False,
):
super().__init__()
@@ -70,6 +67,8 @@ class Flux2Pipeline(DiffusionPipeline):
self.num_channels_latents = 128
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
self.default_sample_size = 64
self.text_encoder_type = text_encoder_type
self.is_guidance_distilled = is_guidance_distilled
def format_input(
self,
@@ -138,12 +137,66 @@ class Flux2Pipeline(DiffusionPipeline):
use_cache=False,
)
out = torch.stack([output.hidden_states[k] for k in OUTPUT_LAYERS], dim=1)
out = torch.stack(
[output.hidden_states[k] for k in OUTPUT_LAYERS_MISTRAL], dim=1
)
prompt_embeds = rearrange(out, "b c l d -> b l (c d)")
# they don't return attention mask, so we create it here
return prompt_embeds, None
def _get_qwen_prompt_embeds(
self,
prompt: Union[str, List[str]] = None,
device: Optional[torch.device] = None,
dtype: Optional[torch.dtype] = None,
max_sequence_length: int = 512,
):
device = device or self._execution_device
dtype = dtype or self.text_encoder.dtype
if not isinstance(prompt, list):
prompt = [prompt]
all_input_ids = []
all_attention_masks = []
for p in prompt:
messages = [{"role": "user", "content": p}]
text = self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False,
)
model_inputs = self.tokenizer(
text,
return_tensors="pt",
padding="max_length",
truncation=True,
max_length=max_sequence_length,
)
all_input_ids.append(model_inputs["input_ids"])
all_attention_masks.append(model_inputs["attention_mask"])
input_ids = torch.cat(all_input_ids, dim=0).to(device)
attention_mask = torch.cat(all_attention_masks, dim=0).to(device)
output = self.text_encoder(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True,
use_cache=False,
)
out = torch.stack([output.hidden_states[k] for k in OUTPUT_LAYERS_QWEN3], dim=1)
prompt_embeds = rearrange(out, "b c l d -> b l (c d)")
# they dont use attention mask
return prompt_embeds, None
def encode_prompt(
self,
prompt: Union[str, List[str]],
@@ -159,9 +212,18 @@ class Flux2Pipeline(DiffusionPipeline):
batch_size = len(prompt) if prompt_embeds is None else prompt_embeds.shape[0]
if prompt_embeds is None:
prompt_embeds, prompt_embeds_mask = self._get_mistral_prompt_embeds(
prompt, device, max_sequence_length=max_sequence_length
)
if self.text_encoder_type == "mistral":
prompt_embeds, prompt_embeds_mask = self._get_mistral_prompt_embeds(
prompt, device, max_sequence_length=max_sequence_length
)
elif self.text_encoder_type == "qwen":
prompt_embeds, prompt_embeds_mask = self._get_qwen_prompt_embeds(
prompt, device, max_sequence_length=max_sequence_length
)
else:
raise ValueError(
f"Unsupported text_encoder_type: {self.text_encoder_type}"
)
_, seq_len, _ = prompt_embeds.shape
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
@@ -220,6 +282,7 @@ class Flux2Pipeline(DiffusionPipeline):
def __call__(
self,
prompt: Union[str, List[str]] = None,
negative_prompt: Optional[Union[str, List[str]]] = None,
height: Optional[int] = None,
width: Optional[int] = None,
num_inference_steps: int = 50,
@@ -229,6 +292,8 @@ class Flux2Pipeline(DiffusionPipeline):
latents: Optional[torch.Tensor] = None,
prompt_embeds: Optional[torch.Tensor] = None,
prompt_embeds_mask: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds_mask: Optional[torch.Tensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
max_sequence_length: int = 512,
@@ -236,6 +301,11 @@ class Flux2Pipeline(DiffusionPipeline):
):
height = height or self.default_sample_size * self.vae_scale_factor
width = width or self.default_sample_size * self.vae_scale_factor
do_guidance = (
guidance_scale is not None
and guidance_scale > 1.0
and not self.is_guidance_distilled
)
self._guidance_scale = guidance_scale
self._current_timestep = None
@@ -263,6 +333,19 @@ class Flux2Pipeline(DiffusionPipeline):
)
txt, txt_ids = batched_prc_txt(prompt_embeds)
neg_txt, neg_txt_ids = None, None
if do_guidance:
negative_prompt_embeds, _ = self.encode_prompt(
prompt=negative_prompt,
prompt_embeds=negative_prompt_embeds,
prompt_embeds_mask=negative_prompt_embeds_mask,
device=device,
num_images_per_prompt=num_images_per_prompt,
max_sequence_length=max_sequence_length,
)
neg_txt, neg_txt_ids = batched_prc_txt(negative_prompt_embeds)
# 4. Prepare latent variables\
latents = self.prepare_latents(
@@ -329,6 +412,17 @@ class Flux2Pipeline(DiffusionPipeline):
guidance=guidance_vec,
)
if do_guidance:
pred_uncond = self.transformer(
x=img_input,
x_ids=img_input_ids,
timesteps=t_vec,
ctx=neg_txt,
ctx_ids=neg_txt_ids,
guidance=guidance_vec,
)
pred = pred_uncond + guidance_scale * (pred - pred_uncond)
if img_cond_seq is not None:
pred = pred[:, : packed_latents.shape[1]]