import spaces import time import os import gradio as gr import torch from einops import rearrange from PIL import Image from transformers import pipeline from flux.cli import SamplingOptions from flux.sampling import denoise, get_noise, get_schedule, prepare, unpack from flux.util import load_ae, load_clip, load_flow_model, load_t5 from pulid.pipeline_flux import PuLIDPipeline from pulid.utils import resize_numpy_image_long from prompt_template import prompt_dict from style_template import styles NSFW_THRESHOLD = 0.85 PROMPTS_NAMES = list(prompt_dict.keys()) DEFAULT_PROMPT_NAME = "Paris" STYLES_NAMES = list(styles.keys()) DEFAULT_STYLE_NAME = "No style (Default)" def get_prompt (prompt_name : str): prompt = prompt_dict.get(prompt_name , prompt_dict[DEFAULT_PROMPT_NAME]) return prompt[0] def apply_style(style_name: str, positive: str, negative: str = "") -> tuple[str, str]: p, n = styles.get(style_name, styles[DEFAULT_STYLE_NAME]) return p.replace("{prompt}", positive), n + ' ' + negative def get_models(name: str, device: torch.device, offload: bool): t5 = load_t5(device, max_length=128) clip = load_clip(device) model = load_flow_model(name, device="cpu" if offload else device) model.eval() ae = load_ae(name, device="cpu" if offload else device) nsfw_classifier = pipeline("image-classification", model="Falconsai/nsfw_image_detection", device=device) return model, ae, t5, clip, nsfw_classifier class FluxGenerator: def __init__(self): self.device = torch.device('cuda') self.offload = False self.model_name = 'flux-dev' self.model, self.ae, self.t5, self.clip, self.nsfw_classifier = get_models( self.model_name, device=self.device, offload=self.offload, ) self.pulid_model = PuLIDPipeline(self.model, 'cuda', weight_dtype=torch.bfloat16) self.pulid_model.load_pretrain() flux_generator = FluxGenerator() @spaces.GPU @torch.inference_mode() def generate_image( prompt_name, style_name, id_image, start_step, guidance, seed, true_cfg, width=896, height=1152, num_steps=20, id_weight=1.0, neg_prompt="bad quality, worst quality, text, signature, watermark, extra limbs , nudity ,Blurred face, low-quality details, exaggerated facial expressions, unrealistic skin texture, distorted proportions", timestep_to_start_cfg=1, max_sequence_length=128, ): flux_generator.t5.max_length = max_sequence_length prompt = get_prompt(prompt_name) prompt=apply_style(style_name , prompt)[0] seed = int(seed) if seed == -1: seed = None opts = SamplingOptions( prompt=prompt, width=width, height=height, num_steps=num_steps, guidance=guidance, seed=seed, ) if opts.seed is None: opts.seed = torch.Generator(device="cpu").seed() print(f"Generating '{opts.prompt}' with seed {opts.seed}") t0 = time.perf_counter() use_true_cfg = abs(true_cfg - 1.0) > 1e-2 if id_image is not None: id_image = resize_numpy_image_long(id_image, 1024) id_embeddings, uncond_id_embeddings = flux_generator.pulid_model.get_id_embedding(id_image, cal_uncond=use_true_cfg) else: id_embeddings = None uncond_id_embeddings = None # prepare input x = get_noise( 1, opts.height, opts.width, device=flux_generator.device, dtype=torch.bfloat16, seed=opts.seed, ) timesteps = get_schedule( opts.num_steps, x.shape[-1] * x.shape[-2] // 4, shift=True, ) if flux_generator.offload: flux_generator.t5, flux_generator.clip = flux_generator.t5.to(flux_generator.device), flux_generator.clip.to(flux_generator.device) inp = prepare(t5=flux_generator.t5, clip=flux_generator.clip, img=x, prompt=opts.prompt) inp_neg = prepare(t5=flux_generator.t5, clip=flux_generator.clip, img=x, prompt=neg_prompt) if use_true_cfg else None # offload TEs to CPU, load model to gpu if flux_generator.offload: flux_generator.t5, flux_generator.clip = flux_generator.t5.cpu(), flux_generator.clip.cpu() torch.cuda.empty_cache() flux_generator.model = flux_generator.model.to(flux_generator.device) # denoise initial noise x = denoise( flux_generator.model, **inp, timesteps=timesteps, guidance=opts.guidance, id=id_embeddings, id_weight=id_weight, start_step=start_step, uncond_id=uncond_id_embeddings, true_cfg=true_cfg, timestep_to_start_cfg=timestep_to_start_cfg, neg_txt=inp_neg["txt"] if use_true_cfg else None, neg_txt_ids=inp_neg["txt_ids"] if use_true_cfg else None, neg_vec=inp_neg["vec"] if use_true_cfg else None, ) # offload model, load autoencoder to gpu if flux_generator.offload: flux_generator.model.cpu() torch.cuda.empty_cache() flux_generator.ae.decoder.to(x.device) # decode latents to pixel space x = unpack(x.float(), opts.height, opts.width) with torch.autocast(device_type=flux_generator.device.type, dtype=torch.bfloat16): x = flux_generator.ae.decode(x) if flux_generator.offload: flux_generator.ae.decoder.cpu() torch.cuda.empty_cache() t1 = time.perf_counter() print(f"Done in {t1 - t0:.1f}s.") # bring into PIL format x = x.clamp(-1, 1) # x = embed_watermark(x.float()) x = rearrange(x[0], "c h w -> h w c") img = Image.fromarray((127.5 * (x + 1.0)).cpu().byte().numpy()) nsfw_score = [x["score"] for x in flux_generator.nsfw_classifier(img) if x["label"] == "nsfw"][0] if nsfw_score < NSFW_THRESHOLD: return img, str(opts.seed), flux_generator.pulid_model.debug_img_list else: return (None, f"Your generated image may contain NSFW (with nsfw_score: {nsfw_score}) content", flux_generator.pulid_model.debug_img_list) _HEADER_ = '''
Logo

2B AI PHOTOMAKER

''' # noqa E501 def create_demo(args, model_name: str, device: str = "cuda" if torch.cuda.is_available() else "cpu", offload: bool = False): with gr.Blocks() as demo: gr.Markdown(_HEADER_) with gr.Row(): with gr.Column(): prompt_name = gr.Dropdown(label="Prompt", choices=PROMPTS_NAMES , value=DEFAULT_PROMPT_NAME) style_name = gr.Dropdown(label="Style", choices=STYLES_NAMES , value=DEFAULT_STYLE_NAME) id_image = gr.Image(label="ID Image") generate_btn = gr.Button("Generate") id_weight = gr.Slider(0.0, 3.0, 1, step=0.05, label="id weight") width = gr.Slider(256, 1536, 896, step=16, label="Width") height = gr.Slider(256, 1536, 1152, step=16, label="Height") num_steps = gr.Slider(1, 20, 20, step=1, label="Number of steps") start_step = gr.Slider(0, 10, 0, step=1, label="timestep to start inserting ID") guidance = gr.Slider(1.0, 10.0, 4, step=0.1, label="Guidance") seed = gr.Textbox(-1, label="Seed (-1 for random)") max_sequence_length = gr.Slider(128, 512, 128, step=128, label="max_sequence_length for prompt (T5), small will be faster") with gr.Accordion("Advanced Options (True CFG, true_cfg_scale=1 means use fake CFG, >1 means use true CFG, if using true CFG, we recommend set the guidance scale to 1)", open=False): # noqa E501 neg_prompt = gr.Textbox( label="Negative Prompt", value="bad quality, worst quality, text, signature, watermark, extra limbs") true_cfg = gr.Slider(1.0, 10.0, 1, step=0.1, label="true CFG scale") timestep_to_start_cfg = gr.Slider(0, 20, 1, step=1, label="timestep to start cfg", visible=args.dev) with gr.Column(): output_image = gr.Image(label="Generated Image", format='png') seed_output = gr.Textbox(label="Used Seed") intermediate_output = gr.Gallery(label='Output', elem_id="gallery", visible=args.dev) generate_btn.click( fn=generate_image, inputs=[prompt_name, style_name,id_image, start_step, guidance, seed, true_cfg, width, height, num_steps, id_weight, neg_prompt, timestep_to_start_cfg, max_sequence_length], outputs=[output_image, seed_output, intermediate_output], ) return demo if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="PuLID for FLUX.1-dev") parser.add_argument("--name", type=str, default="flux-dev", choices=list('flux-dev'), help="currently only support flux-dev") parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu", help="Device to use") parser.add_argument("--offload", action="store_true", help="Offload model to CPU when not in use") parser.add_argument("--port", type=int, default=8080, help="Port to use") parser.add_argument("--dev", action='store_true', help="Development mode") parser.add_argument("--pretrained_model", type=str, help='for development') args = parser.parse_args() import huggingface_hub huggingface_hub.login(os.getenv('HF_TOKEN')) demo = create_demo(args, args.name, args.device, args.offload) demo.launch()