from convert_diffusers_to_sdxl import convert_unet_state_dict from huggingface_hub import hf_hub_download, hf_hub_url, HfApi, HfFileSystem import gradio import gguf import os import requests from safetensors.torch import load_file import shutil import time import urllib from urllib.parse import urlparse, parse_qs, unquote import urllib.request def convert(intro, url, api_key, arch): path = urllib.parse.urlparse(url).path components = path.split('/') filename = components[-1] output_file = 'locked_model.safetensors' print('Step 1/3') lock = Filelock(output_file) if not os.path.exists(output_file): if len(url.split('/')) == 2: if not check_hf_safety(url): raise Exception('Unexpected error ;)') if not lock.acquire(): raise Exception('Wait your time in the queue.') print('Download safetensors from {}.'.format(url)) try: # We won't download the file by hf_hub_download, urllib.request, # but access it remotely. fs = HfFileSystem() with fs.open('{}/unet/diffusion_pytorch_model.safetensors'.format(url)), 'r') as f: byte_data = f.read() sd_fp16 = load_transformer_by_diffuser_checkpoint(sd=safetensors.torch.load(byte_data)) except: lock.release() raise else: if not check_model_safety(filename): raise Exception('Unexpected error ;)') if not lock.acquire(): raise Exception('Wait your time in the queue.') print('Download model by id {}.'.format(filename)) try: # Save a hf copy of the remote file, then access it remotely. fs = HfFileSystem() copy_path = 'twodgirl/wild-sdxl/civit/{}.safetensors' with fs.open(copy_path, 'wb') as f: download_file(url, f, api_key) with fs.open(copy_path, 'r') as f: byte_data = f.read() sd_fp16 = load_transformer_by_original_checkpoint(sd=safetensors.torch.load(byte_data)) except: lock.release() raise print('Step 2/3') os.remove(output_file) # Free hugging space runs out of free space. write('locked_model.gguf', output_file, arch, sd_fp16) print('Step 3/3') api = HfApi() api.upload_file(path_or_fileobj='locked_model.gguf', path_in_repo=filename + '.comfyui.Q8.gguf', repo_id='twodgirl/wild-sdxl', repo_type='model') lock.release() gradio.Info('Download the file from twodgirl/wild-sdxl/{}'.format(filename + '.comfyui.Q8.gguf')) print(output_file) def download_file(url: str, f, token: str): ### # Code from ashleykleynhans/civitai-downloader. USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Gecko/20100101 Firefox/119.0' headers = { 'Authorization': f'Bearer {token}', 'User-Agent': USER_AGENT, } # Disable automatic redirect handling class NoRedirection(urllib.request.HTTPErrorProcessor): def http_response(self, request, response): return response https_response = http_response request = urllib.request.Request(url, headers=headers) opener = urllib.request.build_opener(NoRedirection) response = opener.open(request) if response.status in [301, 302, 303, 307, 308]: redirect_url = response.getheader('Location') # Extract filename from the redirect URL parsed_url = urlparse(redirect_url) query_params = parse_qs(parsed_url.query) content_disposition = query_params.get('response-content-disposition', [None])[0] if content_disposition: filename = unquote(content_disposition.split('filename=')[1].strip('"')) else: raise Exception('Unable to determine filename') response = urllib.request.urlopen(redirect_url) elif response.status == 404: raise Exception('File not found') else: raise Exception('No redirect found, something went wrong') total_size = response.getheader('Content-Length') if total_size is not None: total_size = int(total_size) # With file pointer. downloaded = 0 start_time = time.time() CHUNK_SIZE = 1638400 while True: chunk_start_time = time.time() buffer = response.read(CHUNK_SIZE) chunk_end_time = time.time() if not buffer: break downloaded += len(buffer) f.write(buffer) chunk_time = chunk_end_time - chunk_start_time if chunk_time > 0: speed = len(buffer) / chunk_time / (1024 ** 2) # Speed in MB/s if total_size is not None: progress = downloaded / total_size # sys.stdout.write(f'\rDownloading: {filename} [{progress*100:.2f}%] - {speed:.2f} MB/s') # sys.stdout.flush() end_time = time.time() time_taken = end_time - start_time hours, remainder = divmod(time_taken, 3600) minutes, seconds = divmod(remainder, 60) if hours > 0: time_str = f'{int(hours)}h {int(minutes)}m {int(seconds)}s' elif minutes > 0: time_str = f'{int(minutes)}m {int(seconds)}s' else: time_str = f'{int(seconds)}s' # sys.stdout.write('\n') print(f'Download completed. File saved as: {filename}') print(f'Downloaded in {time_str}') ### # huggingface/twodgirl. # License: apache-2.0 class Filelock: def __init__(self, file_path): self.file_path = file_path self.lock_path = "{}.lock".format(file_path) self.lock_file = None def acquire(self): if os.path.exists(self.lock_path): lock_stat = os.stat(self.lock_path) if time.time() - lock_stat.st_mtime > 900: # 15 minutes os.remove(self.lock_path) if not os.path.exists(self.lock_path): try: self.lock_file = open(self.lock_path, 'w') self.lock_file.write(str(os.getpid())) self.lock_file.flush() return True except IOError: return False return False def release(self): if self.lock_file: self.lock_file.close() os.remove(self.lock_path) self.lock_file = None def check_hf_safety(repo_id): return 'porn' not in repo_id def check_model_safety(model_id): url = f"https://civitai.com/api/v1/model-versions/{model_id}" response = requests.get(url) data = response.json() model_id = data.get('model_id') if model_id: url = f"https://civitai.com/api/v1/models/{model_id}" response = requests.get(url) data = response.json() tags = data.get('tags', []) if 'porn' in tags: return False else: return True else: return True def load_transformer_by_diffuser_checkpoint(filepath=None, sd=None): if sd is None: sd = load_file(filepath) unet_state_dict = convert_unet_state_dict(sd) sd_copy = {"model.diffusion_model." + k: v for k, v in unet_state_dict.items()} return sd_copy def load_transformer_by_original_checkpoint(ckpt_path=None, sd=None): if sd is None: sd = load_file(ckpt_path) sd_copy = {} for key in sd.keys(): if key.startswith('model.diffusion_model.'): sd_copy[key] = sd3[key] return sd_copy def write(target_path, checkpoint_path, arch, sd_fp16): writer = gguf.GGUFWriter(target_path, arch=arch) target_quant = gguf.GGMLQuantizationType.Q8_0 writer.add_quantization_version(gguf.GGML_QUANT_VERSION) writer.add_file_type(target_quant) sd = {} for key in sd_fp16.keys(): tensor = sd_fp16[key] if len(tensor.shape) == 1 or len(tensor.shape) == 4: q = gguf.GGMLQuantizationType.F16 else: q = target_quant sd[key] = gguf.quants.quantize(tensor.numpy(), q) writer.add_tensor(key, sd[key], raw_dtype=q) writer.write_header_to_file(target_path) writer.write_kv_data_to_file() writer.write_tensors_to_file() writer.close() intro = gradio.Markdown(""" ## Convert a SDXL model to GGUF Convert a Pony/SDXL model's UNet to GGUF (Q8). The question is whether I can automate tasks to the extent that would allow me to spend more time with my cat at home. This space takes a diffusers file from 🤗, then converts it to [name your UI] compatible* format. The result should be avail in 10 minutes in the twodgirl/wild-sdxl model directory. *That's an overstatement, as I only test it with my own comfy-gguf node. The url format must follow: *[hf-username]/[sdxl-repo-name]* which must lead to the /unet/diffusion_pytorch_model.safetensors. ### Disclaimer Use of this code requires citation and attribution to the author via a link to their Hugging Face profile in all resulting work. """) url = gradio.Textbox(label='Download url') api_key = gradio.Textbox(label='API key') arch = gradio.Textbox(label='Architecture', value='sdxl') if __name__ == '__main__': demo = gradio.Interface(convert, [intro, url, api_key, arch], outputs=None) demo.queue().launch()