import os from threading import Thread from typing import Iterator import json from datetime import datetime from pathlib import Path from uuid import uuid4 import gradio as gr import spaces import torch from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer from pathlib import Path from pinecone.grpc import PineconeGRPC as Pinecone import torch from huggingface_hub import CommitScheduler HF_UPLOAD = os.environ.get("HF_UPLOAD") JSON_DATASET_DIR = Path("json_dataset") JSON_DATASET_DIR.mkdir(parents=True, exist_ok=True) JSON_DATASET_PATH = JSON_DATASET_DIR / f"train-{uuid4()}.json" scheduler = CommitScheduler( repo_id="psyche/llama3-mrc-chat-log", repo_type="dataset", folder_path=JSON_DATASET_DIR, path_in_repo="data", token=HF_UPLOAD ) pc = Pinecone(api_key=os.environ.get("PINECONE")) index = pc.Index("commonsense") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") retriever_tokenizer = AutoTokenizer.from_pretrained("psyche/dpr-longformer-ko-4096") retriever = AutoModel.from_pretrained("psyche/dpr-longformer-ko-4096") retriever.eval() retriever.to(device) def save_json(question: str, answer: str) -> None: with scheduler.lock: with JSON_DATASET_PATH.open("a") as f: json.dump({"question": question, "answer": answer, "datetime": datetime.now().isoformat(), "label":""}, f, ensure_ascii=False) f.write("\n") MAX_MAX_NEW_TOKENS = 8192 DEFAULT_MAX_NEW_TOKENS = 4096 MAX_INPUT_TOKEN_LENGTH = 2048 DESCRIPTION = """\ # Llama-3 8B Korean QA Chatbot \ """ if not torch.cuda.is_available(): DESCRIPTION += "\n

Running on CPU πŸ₯Ά This demo does not work on CPU.

" if torch.cuda.is_available(): model_id = "psyche/llama3-8b-instruct-ko" model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", load_in_4bit=True, revision="v4.2") tokenizer = AutoTokenizer.from_pretrained(model_id) @spaces.GPU def generate( message: str, chat_history: list[tuple[str, str]], system_prompt: str, max_new_tokens: int = 1024, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2, ) -> Iterator[str]: conversation = [] if system_prompt: conversation.append({"role": "system", "content": system_prompt}) for user, assistant in chat_history: conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) retriever_inputs = retriever_tokenizer([message], max_length=1024, truncation=True, return_tensors="pt") with torch.no_grad(): embeddings = model(**inputs).pooler_output embeddings = embeddings.cpu().numpy() results = index.query( vector=embeddings[0], top_k=1, include_values=False, include_metadata=True ) conversation.append({"role": "user", "content": results["matches"][0]["metadata"]+f"\n\nμœ„ λ¬Έλ§₯을 μ°Έκ³ ν•˜μ—¬ 질문 '{message}'에 λ‹΅ν•˜λ©΄?"}) input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt", add_generation_prompt=True) if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") input_ids = input_ids.to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( {"input_ids": input_ids}, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, top_k=top_k, temperature=temperature, num_beams=1, repetition_penalty=repetition_penalty, ) t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() outputs = [] for text in streamer: outputs.append(text) yield "".join(outputs) save_json(message, "".join(outputs)) chat_interface = gr.ChatInterface( fn=generate, additional_inputs=[ gr.Textbox(label="System prompt", lines=6), gr.Slider( label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS, ), gr.Slider( label="Temperature", minimum=0.01, maximum=4.0, step=0.1, value=0.01, ), gr.Slider( label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9, ), gr.Slider( label="Top-k", minimum=1, maximum=1000, step=1, value=50, ), gr.Slider( label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.15, ), ], stop_btn=None, examples=[ ["μ•ˆλ…•?"], ["λ„ˆκ°€ ν•  수 μžˆλŠ”κ²Œ 뭐야?"], ["νŒŒμ΄μ¬μ— λŒ€ν•΄μ„œ μ•Œλ €μ€˜"], ["λŒ€ν•œλ―Όκ΅­μ˜ μˆ˜λ„λŠ”?"], ["λ…λ„λŠ” μ–΄λŠλ‚˜λΌ 땅이야?"], ], ) with gr.Blocks(css="style.css") as demo: gr.Markdown(DESCRIPTION) gr.DuplicateButton(value="Duplicate Space for private use", elem_id="duplicate-button") chat_interface.render() if __name__ == "__main__": demo.queue(max_size=20).launch()