import csv import gradio as gr import open_clip import os import torch import base64 from PIL import Image import clip_interrogator from clip_interrogator import Config, Interrogator import modules.generation_parameters_copypaste as parameters_copypaste from modules import devices, lowvram, script_callbacks, shared from pydantic import BaseModel, Field from fastapi import FastAPI from fastapi.exceptions import HTTPException from io import BytesIO __version__ = "0.2.1" ci = None low_vram = False BATCH_OUTPUT_MODES = [ 'Text file for each image', 'Single text file with all prompts', 'csv file with columns for filenames and prompts', ] class BatchWriter: def __init__(self, folder, mode): self.folder = folder self.mode = mode self.csv, self.file = None, None if mode == BATCH_OUTPUT_MODES[1]: self.file = open(os.path.join(folder, 'batch.txt'), 'w', encoding='utf-8') elif mode == BATCH_OUTPUT_MODES[2]: self.file = open(os.path.join(folder, 'batch.csv'), 'w', encoding='utf-8', newline='') self.csv = csv.writer(self.file, quoting=csv.QUOTE_MINIMAL) self.csv.writerow(['filename', 'prompt']) def add(self, file, prompt): if self.mode == BATCH_OUTPUT_MODES[0]: txt_file = os.path.splitext(file)[0] + ".txt" with open(os.path.join(self.folder, txt_file), 'w', encoding='utf-8') as f: f.write(prompt) elif self.mode == BATCH_OUTPUT_MODES[1]: self.file.write(f"{prompt}\n") elif self.mode == BATCH_OUTPUT_MODES[2]: self.csv.writerow([file, prompt]) def close(self): if self.file is not None: self.file.close() def load(clip_model_name): global ci if ci is None: print(f"Loading CLIP Interrogator {clip_interrogator.__version__}...") config = Config( device=devices.get_optimal_device(), cache_path = 'models/clip-interrogator', clip_model_name=clip_model_name, ) if low_vram: config.apply_low_vram_defaults() ci = Interrogator(config) if clip_model_name != ci.config.clip_model_name: ci.config.clip_model_name = clip_model_name ci.load_clip_model() def unload(): global ci if ci is not None: print("Offloading CLIP Interrogator...") ci.caption_model = ci.caption_model.to(devices.cpu) ci.clip_model = ci.clip_model.to(devices.cpu) ci.caption_offloaded = True ci.clip_offloaded = True devices.torch_gc() def image_analysis(image, clip_model_name): load(clip_model_name) image = image.convert('RGB') image_features = ci.image_to_features(image) top_mediums = ci.mediums.rank(image_features, 5) top_artists = ci.artists.rank(image_features, 5) top_movements = ci.movements.rank(image_features, 5) top_trendings = ci.trendings.rank(image_features, 5) top_flavors = ci.flavors.rank(image_features, 5) medium_ranks = {medium: sim for medium, sim in zip(top_mediums, ci.similarities(image_features, top_mediums))} artist_ranks = {artist: sim for artist, sim in zip(top_artists, ci.similarities(image_features, top_artists))} movement_ranks = {movement: sim for movement, sim in zip(top_movements, ci.similarities(image_features, top_movements))} trending_ranks = {trending: sim for trending, sim in zip(top_trendings, ci.similarities(image_features, top_trendings))} flavor_ranks = {flavor: sim for flavor, sim in zip(top_flavors, ci.similarities(image_features, top_flavors))} return medium_ranks, artist_ranks, movement_ranks, trending_ranks, flavor_ranks def interrogate(image, mode, caption=None): if mode == 'best': prompt = ci.interrogate(image, caption=caption) elif mode == 'caption': prompt = ci.generate_caption(image) if caption is None else caption elif mode == 'classic': prompt = ci.interrogate_classic(image, caption=caption) elif mode == 'fast': prompt = ci.interrogate_fast(image, caption=caption) elif mode == 'negative': prompt = ci.interrogate_negative(image) else: raise Exception(f"Unknown mode {mode}") return prompt def image_to_prompt(image, mode, clip_model_name): shared.state.begin() shared.state.job = 'interrogate' try: if shared.cmd_opts.lowvram or shared.cmd_opts.medvram: lowvram.send_everything_to_cpu() devices.torch_gc() load(clip_model_name) image = image.convert('RGB') prompt = interrogate(image, mode) except torch.cuda.OutOfMemoryError as e: prompt = "Ran out of VRAM" print(e) except RuntimeError as e: prompt = f"Exception {type(e)}" print(e) shared.state.end() return prompt def about_tab(): gr.Markdown("## 🕵️‍♂️ CLIP Interrogator 🕵️‍♂️") gr.Markdown("*Want to figure out what a good prompt might be to create new images like an existing one? The CLIP Interrogator is here to get you answers!*") gr.Markdown("## Notes") gr.Markdown( "CLIP models:\n" "* For best prompts with Stable Diffusion 1.* choose the **ViT-L-14/openai** model.\n" "* For best prompts with Stable Diffusion 2.* choose the **ViT-H-14/laion2b_s32b_b79k** model.\n" "* For best prompts with Stable Diffusion XL choose **ViT-L-14/openai** or **ViT-bigG-14/laion2b_s39b** model.\n" "\nOther:\n" "* When you are done click the **Unload** button to free up memory." ) gr.Markdown("## Github") gr.Markdown("If you have any issues please visit [CLIP Interrogator on Github](https://github.com/pharmapsychotic/clip-interrogator) and drop a star if you like it!") gr.Markdown(f"

CLIP Interrogator version: {clip_interrogator.__version__}
Extension version: {__version__}") if torch.cuda.is_available(): device = devices.get_optimal_device() vram_total_mb = torch.cuda.get_device_properties(device).total_memory / (1024**2) vram_info = f"GPU VRAM: **{vram_total_mb:.2f}MB**" if low_vram: vram_info += "
Using low VRAM configuration" gr.Markdown(vram_info) def get_models(): return ['/'.join(x) for x in open_clip.list_pretrained()] def analyze_tab(): with gr.Column(): with gr.Row(): image = gr.Image(type='pil', label="Image") model = gr.Dropdown(get_models(), value='ViT-L-14/openai', label='CLIP Model') with gr.Row(): medium = gr.Label(label="Medium", num_top_classes=5) artist = gr.Label(label="Artist", num_top_classes=5) movement = gr.Label(label="Movement", num_top_classes=5) trending = gr.Label(label="Trending", num_top_classes=5) flavor = gr.Label(label="Flavor", num_top_classes=5) button = gr.Button("Analyze", variant='primary') button.click(image_analysis, inputs=[image, model], outputs=[medium, artist, movement, trending, flavor]) def batch_tab(): def batch_process(folder, clip_model, mode, output_mode): if not os.path.exists(folder): print(f"Folder {folder} does not exist") return if not os.path.isdir(folder): print("{folder} is not a directory") return files = [f for f in os.listdir(folder) if f.lower().endswith(('.png', '.jpg', '.jpeg'))] if not files: print("Folder has no images") return shared.state.begin() shared.state.job = 'batch interrogate' try: if shared.cmd_opts.lowvram or shared.cmd_opts.medvram: lowvram.send_everything_to_cpu() devices.torch_gc() load(clip_model) shared.total_tqdm.updateTotal(len(files)) ci.config.quiet = True # generate captions in first pass captions = [] for file in files: try: if shared.state.interrupted: break image = Image.open(os.path.join(folder, file)).convert('RGB') caption = ci.generate_caption(image) except OSError as e: print(f"{e}; continuing") caption = "" finally: captions.append(caption) shared.total_tqdm.update() # interrogate in second pass writer = BatchWriter(folder, output_mode) shared.total_tqdm.clear() shared.total_tqdm.updateTotal(len(files)) for idx, file in enumerate(files): try: if shared.state.interrupted: break image = Image.open(os.path.join(folder, file)).convert('RGB') prompt = interrogate(image, mode, caption=captions[idx]) writer.add(file, prompt) except OSError as e: print(f" {e}, continuing") finally: shared.total_tqdm.update() writer.close() ci.config.quiet = False unload() except torch.cuda.OutOfMemoryError as e: print(e) print("Ran out of VRAM!") except RuntimeError as e: print(e) shared.state.end() shared.total_tqdm.clear() with gr.Column(): with gr.Row(): folder = gr.Text(label="Images folder", value="", interactive=True) with gr.Row(): clip_model = gr.Dropdown(get_models(), value='ViT-L-14/openai', label='CLIP Model') mode = gr.Radio(['caption', 'best', 'fast', 'classic', 'negative'], label='Prompt Mode', value='fast') output_mode = gr.Dropdown(BATCH_OUTPUT_MODES, value=BATCH_OUTPUT_MODES[0], label='Output Mode') with gr.Row(): button = gr.Button("Go!", variant='primary') interrupt = gr.Button('Interrupt', visible=True) interrupt.click(fn=lambda: shared.state.interrupt(), inputs=[], outputs=[]) button.click(batch_process, inputs=[folder, clip_model, mode, output_mode], outputs=[]) def prompt_tab(): with gr.Column(): with gr.Row(): image = gr.Image(type='pil', label="Image") with gr.Column(): mode = gr.Radio(['best', 'fast', 'classic', 'negative'], label='Mode', value='best') clip_model = gr.Dropdown(get_models(), value='ViT-L-14/openai', label='CLIP Model') prompt = gr.Textbox(label="Prompt", lines=3) with gr.Row(): button = gr.Button("Generate", variant='primary') unload_button = gr.Button("Unload") with gr.Row(): buttons = parameters_copypaste.create_buttons(["txt2img", "img2img", "inpaint", "extras"]) button.click(image_to_prompt, inputs=[image, mode, clip_model], outputs=prompt) unload_button.click(unload) for tabname, button in buttons.items(): parameters_copypaste.register_paste_params_button(parameters_copypaste.ParamBinding(paste_button=button, tabname=tabname, source_text_component=prompt, source_image_component=image,)) def add_tab(): global low_vram low_vram = shared.cmd_opts.lowvram or shared.cmd_opts.medvram if not low_vram and torch.cuda.is_available(): device = devices.get_optimal_device() vram_total = torch.cuda.get_device_properties(device).total_memory if vram_total <= 12*1024*1024*1024: low_vram = True with gr.Blocks(analytics_enabled=False) as ui: with gr.Tab("Prompt"): prompt_tab() with gr.Tab("Analyze"): analyze_tab() with gr.Tab("Batch"): batch_tab() with gr.Tab("About"): about_tab() return [(ui, "Interrogator", "interrogator")] # decode_base64_to_image from modules/api/api.py, could be imported from there def decode_base64_to_image(encoding): if encoding.startswith("data:image/"): encoding = encoding.split(";")[1].split(",")[1] try: image = Image.open(BytesIO(base64.b64decode(encoding))) return image except Exception as e: raise HTTPException(status_code=500, detail="Invalid encoded image") from e class InterrogatorAnalyzeRequest(BaseModel): image: str = Field( default="", title="Image", description="Image to work on, must be a Base64 string containing the image's data.", ) clip_model_name: str = Field( default="ViT-L-14/openai", title="Model", description="The interrogate model used. See the models endpoint for a list of available models.", ) class InterrogatorPromptRequest(InterrogatorAnalyzeRequest): mode: str = Field( default="fast", title="Mode", description="The mode used to generate the prompt. Can be one of: best, fast, classic, negative.", ) def mount_interrogator_api(_: gr.Blocks, app: FastAPI): @app.get("/interrogator/models") async def get_models(): return ["/".join(x) for x in open_clip.list_pretrained()] @app.post("/interrogator/prompt") async def get_prompt(analyzereq: InterrogatorPromptRequest): image_b64 = analyzereq.image if image_b64 is None: raise HTTPException(status_code=404, detail="Image not found") img = decode_base64_to_image(image_b64) prompt = image_to_prompt(img, analyzereq.mode, analyzereq.clip_model_name) return {"prompt": prompt} @app.post("/interrogator/analyze") async def analyze(analyzereq: InterrogatorAnalyzeRequest): image_b64 = analyzereq.image if image_b64 is None: raise HTTPException(status_code=404, detail="Image not found") img = decode_base64_to_image(image_b64) ( medium_ranks, artist_ranks, movement_ranks, trending_ranks, flavor_ranks, ) = image_analysis(img, analyzereq.clip_model_name) return { "medium": medium_ranks, "artist": artist_ranks, "movement": movement_ranks, "trending": trending_ranks, "flavor": flavor_ranks, } script_callbacks.on_app_started(mount_interrogator_api) script_callbacks.on_ui_tabs(add_tab)