|
from convert_diffusers_to_sdxl import convert_unet_state_dict |
|
from huggingface_hub import hf_hub_download, hf_hub_url, HfApi, HfFileSystem |
|
import gradio |
|
import gguf |
|
from map_streamer import InSafetensorsDict |
|
import os |
|
import requests |
|
import safetensors.torch |
|
import shutil |
|
import time |
|
import urllib |
|
from urllib.parse import urlparse, parse_qs, unquote |
|
import urllib.request |
|
|
|
def convert(intro, url, api_key, arch): |
|
path = urllib.parse.urlparse(url).path |
|
components = path.split('/') |
|
filename = components[-1] |
|
output_file = 'locked_model.safetensors' |
|
sd_fp16 = None |
|
print('Step 1/3') |
|
lock = Filelock(output_file) |
|
if not lock.acquire(): |
|
gradio.Error('Wait your turn in the queue.') |
|
raise Exception('Wait your turn in the queue.') |
|
else: |
|
if len(url.split('/')) == 2: |
|
if not check_hf_safety(url): |
|
lock.release() |
|
raise Exception('Unexpected error ;)') |
|
print('Download safetensors from {}.'.format(url)) |
|
try: |
|
|
|
|
|
fs = HfFileSystem() |
|
with fs.open('{}/unet/diffusion_pytorch_model.safetensors'.format(url), 'rb') as f: |
|
sd_fp16 = load_transformer_by_diffuser_checkpoint(sd=safetensors.torch.load(f.read())) |
|
except: |
|
lock.release() |
|
raise |
|
elif url.endswith('.safetensors'): |
|
fs = HfFileSystem() |
|
if not fs.exists(url): |
|
gradio.Warning('Wrong URL format.') |
|
raise Exception('Wrong URL format.') |
|
|
|
copy_path = url |
|
filename = filename.replace('.safetensors', '') |
|
else: |
|
if not check_model_safety(filename): |
|
lock.release() |
|
raise Exception('Unexpected error ;)') |
|
if 'download/models' not in url: |
|
|
|
lock.release() |
|
gradio.Warning('Wrong URL format.') |
|
raise Exception('Wrong URL format.') |
|
print('Download model by id {}.'.format(filename)) |
|
try: |
|
|
|
fs = HfFileSystem() |
|
copy_path = '{}/civit/{}.safetensors'.format(os.getenv('HF_MODEL_TARGET'), |
|
filename) |
|
if not fs.exists(copy_path): |
|
with fs.open(copy_path, 'wb') as f: |
|
download_file(url, f, api_key) |
|
|
|
|
|
|
|
|
|
except: |
|
lock.release() |
|
raise |
|
print('Step 2/3') |
|
|
|
|
|
try: |
|
if os.path.exists('/data'): |
|
print('Write to paid storage.') |
|
gguf_path = '/data/locked_model.gguf' |
|
else: |
|
gguf_path = 'locked_model.gguf' |
|
if sd_fp16: |
|
|
|
write(gguf_path, output_file, arch, sd_fp16) |
|
else: |
|
|
|
with fs.open(copy_path, 'rb') as f: |
|
safesd = InSafetensorsDict(f, 65536 * 1024) |
|
write_uf_by_original_checkpoint(gguf_path, safesd, arch) |
|
except OSError as e: |
|
lock.release() |
|
if os.path.exists(gguf_path): |
|
os.remove(gguf_path) |
|
gradio.Error(str(e)) |
|
raise |
|
print('Step 3/3') |
|
api = HfApi() |
|
api.upload_file(path_or_fileobj=gguf_path, |
|
path_in_repo=filename + '.comfyui.Q8.gguf', |
|
repo_id=os.getenv('HF_MODEL_TARGET'), |
|
repo_type='model') |
|
lock.release() |
|
gradio.Info('Download the file from {}/{}'.format(os.getenv('HF_MODEL_TARGET'), |
|
filename + '.comfyui.Q8.gguf')) |
|
print('{}/{}'.format(os.getenv('HF_MODEL_TARGET'), |
|
filename + '.comfyui.Q8.gguf')) |
|
|
|
def download_file(url: str, f, token: str): |
|
|
|
|
|
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Gecko/20100101 Firefox/119.0' |
|
|
|
headers = { |
|
'Authorization': f'Bearer {token}', |
|
'User-Agent': USER_AGENT, |
|
} |
|
|
|
|
|
class NoRedirection(urllib.request.HTTPErrorProcessor): |
|
def http_response(self, request, response): |
|
return response |
|
https_response = http_response |
|
|
|
request = urllib.request.Request(url, headers=headers) |
|
opener = urllib.request.build_opener(NoRedirection) |
|
response = opener.open(request) |
|
|
|
if response.status in [301, 302, 303, 307, 308]: |
|
redirect_url = response.getheader('Location') |
|
|
|
|
|
parsed_url = urlparse(redirect_url) |
|
query_params = parse_qs(parsed_url.query) |
|
content_disposition = query_params.get('response-content-disposition', [None])[0] |
|
|
|
if content_disposition: |
|
filename = unquote(content_disposition.split('filename=')[1].strip('"')) |
|
else: |
|
raise Exception('Unable to determine filename') |
|
|
|
response = urllib.request.urlopen(redirect_url) |
|
elif response.status == 404: |
|
raise Exception('File not found') |
|
else: |
|
raise Exception('No redirect found, something went wrong') |
|
|
|
total_size = response.getheader('Content-Length') |
|
|
|
if total_size is not None: |
|
total_size = int(total_size) |
|
|
|
|
|
downloaded = 0 |
|
start_time = time.time() |
|
|
|
CHUNK_SIZE = 1638400 |
|
while True: |
|
chunk_start_time = time.time() |
|
buffer = response.read(CHUNK_SIZE) |
|
chunk_end_time = time.time() |
|
|
|
if not buffer: |
|
break |
|
|
|
downloaded += len(buffer) |
|
f.write(buffer) |
|
chunk_time = chunk_end_time - chunk_start_time |
|
|
|
if chunk_time > 0: |
|
speed = len(buffer) / chunk_time / (1024 ** 2) |
|
|
|
if total_size is not None: |
|
progress = downloaded / total_size |
|
|
|
|
|
|
|
end_time = time.time() |
|
time_taken = end_time - start_time |
|
hours, remainder = divmod(time_taken, 3600) |
|
minutes, seconds = divmod(remainder, 60) |
|
|
|
if hours > 0: |
|
time_str = f'{int(hours)}h {int(minutes)}m {int(seconds)}s' |
|
elif minutes > 0: |
|
time_str = f'{int(minutes)}m {int(seconds)}s' |
|
else: |
|
time_str = f'{int(seconds)}s' |
|
|
|
|
|
print(f'Download completed. File saved as: {filename}') |
|
print(f'Downloaded in {time_str}') |
|
|
|
|
|
|
|
|
|
|
|
class Filelock: |
|
def __init__(self, file_path): |
|
self.file_path = file_path |
|
self.lock_path = "{}.lock".format(file_path) |
|
self.lock_file = None |
|
|
|
def acquire(self): |
|
if os.path.exists(self.lock_path): |
|
lock_stat = os.stat(self.lock_path) |
|
if time.time() - lock_stat.st_mtime > 900: |
|
os.remove(self.lock_path) |
|
if not os.path.exists(self.lock_path): |
|
try: |
|
self.lock_file = open(self.lock_path, 'w') |
|
self.lock_file.write(str(os.getpid())) |
|
self.lock_file.flush() |
|
return True |
|
except IOError: |
|
return False |
|
return False |
|
|
|
def release(self): |
|
if self.lock_file: |
|
self.lock_file.close() |
|
os.remove(self.lock_path) |
|
self.lock_file = None |
|
|
|
def check_hf_safety(repo_id): |
|
return 'porn' not in repo_id |
|
|
|
def check_model_safety(model_id): |
|
url = f"https://civitai.com/api/v1/model-versions/{model_id}" |
|
response = requests.get(url) |
|
data = response.json() |
|
|
|
model_id = data.get('model_id') |
|
|
|
if model_id: |
|
url = f"https://civitai.com/api/v1/models/{model_id}" |
|
response = requests.get(url) |
|
data = response.json() |
|
|
|
tags = data.get('tags', []) |
|
if 'porn' in tags: |
|
return False |
|
else: |
|
return True |
|
else: |
|
return True |
|
|
|
def load_transformer_by_diffuser_checkpoint(filepath=None, sd=None): |
|
if sd is None: |
|
sd = safetensors.torch.load_file(filepath) |
|
unet_state_dict = convert_unet_state_dict(sd) |
|
sd_copy = {"model.diffusion_model." + k: v for k, v in unet_state_dict.items()} |
|
|
|
return sd_copy |
|
|
|
def load_transformer_by_original_checkpoint(ckpt_path=None, sd=None): |
|
if sd is None: |
|
sd = safetensors.torch.load_file(ckpt_path) |
|
sd_copy = {} |
|
for key in sd.keys(): |
|
if key.startswith('model.diffusion_model.'): |
|
sd_copy[key] = sd[key] |
|
|
|
return sd_copy |
|
|
|
def write(target_path, checkpoint_path, arch, sd_fp16): |
|
writer = gguf.GGUFWriter(target_path, arch=arch) |
|
target_quant = gguf.GGMLQuantizationType.Q8_0 |
|
writer.add_quantization_version(gguf.GGML_QUANT_VERSION) |
|
writer.add_file_type(target_quant) |
|
sd = {} |
|
for key in sd_fp16.keys(): |
|
tensor = sd_fp16[key] |
|
if len(tensor.shape) == 1 or len(tensor.shape) == 4: |
|
q = gguf.GGMLQuantizationType.F16 |
|
else: |
|
q = target_quant |
|
sd[key] = gguf.quants.quantize(tensor.numpy(), q) |
|
writer.add_tensor(key, sd[key], raw_dtype=q) |
|
writer.write_header_to_file(target_path) |
|
writer.write_kv_data_to_file() |
|
writer.write_tensors_to_file() |
|
writer.close() |
|
|
|
def write_uf_by_original_checkpoint(target_path, |
|
sd, |
|
arch, |
|
target_quant=gguf.GGMLQuantizationType.Q8_0): |
|
writer = gguf.GGUFWriter(target_path, arch=arch) |
|
writer.add_quantization_version(gguf.GGML_QUANT_VERSION) |
|
writer.add_file_type(target_quant) |
|
for key, tensor in sd.items(): |
|
if key.startswith('model.diffusion_model.'): |
|
if len(tensor.shape) == 1 or len(tensor.shape) == 4: |
|
q = gguf.GGMLQuantizationType.F16 |
|
else: |
|
q = target_quant |
|
layer_values = gguf.quants.quantize(tensor.numpy(), q) |
|
writer.add_tensor(key, layer_values, raw_dtype=q) |
|
writer.write_header_to_file(target_path) |
|
writer.write_kv_data_to_file() |
|
writer.write_tensors_to_file() |
|
writer.close() |
|
|
|
intro = gradio.Markdown(""" |
|
## Convert a SDXL model to GGUF |
|
|
|
Convert a Pony/SDXL model's UNet to GGUF (Q8). |
|
|
|
The question is whether I can automate tasks to the extent that would allow me to spend more time with my cat at home. |
|
|
|
This space takes a diffusers file from 🤗, then converts it to [name your UI] compatible* format. The result should be avail in 10 minutes in the model directory. |
|
|
|
*That's an overstatement, as I only test it with my own comfy-gguf node. |
|
|
|
The url format must follow: |
|
|
|
*[hf-username]/[sdxl-repo-name]* which must lead to the /unet/diffusion_pytorch_model.safetensors. |
|
|
|
https://civitai.com/api/download/models/XXX?type=Model&format=SafeTensor&size=pruned&fp=fp16 |
|
|
|
### Disclaimer |
|
|
|
Use of this code requires citation and attribution to the author via a link to their Hugging Face profile in all resulting work. |
|
""") |
|
url = gradio.Textbox(label='Download url') |
|
api_key = gradio.Textbox(label='API key') |
|
arch = gradio.Textbox(label='Architecture', value='sdxl') |
|
|
|
if __name__ == '__main__': |
|
demo = gradio.Interface(convert, |
|
[intro, url, api_key, arch], |
|
outputs=None) |
|
demo.queue().launch() |
|
|